多くの大手テクノロジー企業は、ユーザーが複数のデバイスでサインインできる便利な機能を提供しています。ユーザーは自分のデバイスを管理し、どのデバイスがサインインしているかを確認できるほか、サインインしたデバイスを使用して任意のデバイスからサインアウトすることもできます。今日は、Redis と JWT を使用して同様の認証システムを実装する方法を探りたいと思います。
このため、私は以下を使用することにしました。
バックエンド用の高速API
キャッシュ用のRedis
リクエストを承認するために JWT (JSON Web Tokens) を使用します。アプリケーションはステートレス (つまり、以前のリクエストの記憶がない) なので、セッションとユーザー データを送信する方法が必要です。JWT はステートレス アプリケーションでの認証の管理に最適で、非常に役立ちます。
ただし、JWT の欠点の 1 つは、トークンに含めるペイロードが増えるほど、トークンが長くなることです。私たちのシステムでは、トークンにsession_id
とusername
のみを含めることにしました。この情報は、トークンを過度に大きくすることなく、リクエストを承認するのに十分です。リクエストを承認するには、JWT (JSON Web Tokens) を使用します。私たちのアプリケーションはステートレス (つまり、以前のリクエストのメモリがない) であるため、セッションとユーザー データを送信する方法が必要です。JWT は、ステートレス アプリケーションでの認証の管理に最適であり、この点で優れた機能を発揮します。
この文脈では、「セッション」とは、ユーザーがアプリとやり取りするデバイスまたは手段を指します。基本的には、ユーザーがログインしているデバイスです。ユーザーがログイン要求を行うたびに、関連するすべてのデバイス情報を含む新しいセッション (デバイス) がシステムに作成されます。このデータは、将来の要求に備えて Redis に保存されます。
まず最初に、ローカル マシンに Redis がインストールされていることを確認します。Redis をインストールするには、 https://redis.io/docs/latest/operate/oss_and_stack/install/install-redis/にアクセスし、オペレーティング システム固有の手順に従ってください。
次に、Python をインストールします。このために、Python 3.11 を使用します (まだ 3.12 にアップグレードする必要性を感じていません。率直に言って、3.11 を使用する唯一の理由は StrEnum のためです。それ以外は、まだ 3.10 が好きです)。
次に、poetryをインストールする必要があります。これは私が使用するパッケージマネージャーです。
pip install poetry # or python3.11 -m install poetry
これで決まりです。リポジトリをクローンしてください。
git clone https://github.com/emperorsixpacks/multi_device_sign_in_with_redis.git && cd server poetry shell && poetry install # create a new virtual environment and install all dependanceies
import os from redis import Redis load_dotenv(".env") REDIS_HOST = os.getenv("REDIS_HOST", "localhost") REDIS_PORT = os.getenv("REDIS_PORT", "6379") redis_client = Redis(REDIS_HOST, int(REDIS_PORT))
このチュートリアルで使用するデモ データベースをdemo_users.jsonに作成しました。
{ "user124": { "username": "user124", "email": "[email protected]", "password": "1234", "bio": "This is my brief bio" }, "user123": { "username": "user123", "email": "[email protected]", "password": "1234", "bio": "This is my brief bio" } }
ここで、データベースのスキーマとヘルパー関数を追加する必要があります。簡潔にするため、ここではすべてのコードを記載しません。
@dataclass class Session: """ A class to represent a user's session. Attributes: session_id (str): A unique id for the session. device_name (str): The name of the device used for the session. ip_address (str): The ip address of the device used for the session. device_id (str): A unique id for the device. date_created (datetime): The date and time the session was created. """ session_id: str = field(default_factory=create_new_session_id) device_name: str = field(default=None) ip_address: str = field(default=None) device_id: str = field(default_factory=generate_new_device_id) date_created: datetime = field(default_factory=now_date_time_to_str) @dataclass class User: """ A class to represent a user. Attributes: username (str): The username of the user. email (str): The email of the user. password (str): The password of the user. bio (str): The bio of the user. sessions (List[Session] | None): A list of Session objects representing the user's sessions. """ username: str = field(default=None) email: str = field(default=None) password: str = field(default=None) bio: str = field(default=None) sessions: List[Session] | None = None @property def __dict__(self): """ Returns a dictionary representing the user. Returns: Dict[str, Any]: A dictionary representing the user """ return { "username": self.username, "email": self.email, "password": self.password, "bio": self.bio, "sessions": self.return_session_dict(), } def return_session_dict(self): """ Returns a list of dictionaries representing the user's sessions. If the sessions field is a list of Session objects, returns a list of dictionaries where each dictionary is the __dict__ of a Session object. If the sessions field is a list of dictionaries, returns the list as is. Returns: List[Dict[str, Any]]: A list of dictionaries representing the user's sessions """ try: return [session.__dict__ for session in self.sessions] except AttributeError: return [session for session in self.sessions] # Utiliy finctions def return_user_from_db(username) -> User | None: """ Retrieves a user from the database by their username. Args: username (str): The username of the user to be retrieved Returns: User | None: The user if found, None otherwise """ with open("demo_users.json", "r", encoding="utf-8") as file: user = json.load(file).get(str(username), None) return User(**user) or None
私たちはFastAPIを使ってアプリを運営しているので、それを設定してみましょう。
# Setting up server from fastapi import FastAPI from fastapi.responses import JSONResponse app = FastAPI( name="Multi device sign in with Redis", description="Multi device sign in with Redis in stateless applications", ) @app.get("/") def index_route(): return JSONResponse(content={"Message": "hello, this seems to be working :)"}) if __name__ == "__main__": import uvicorn uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=True, use_colors=True)
よし、これは良いことだ。私たちのアプリケーションはうまくいっているようだ
ユーザーがシステムにログインするたびに、 session_id
生成し、そのセッションを他のすべてのセッションとともに Redis に保存する方法が必要です。
ユーザーがログインすると、まずリクエストを認証して有効であることを確認します。検証が完了すると、リクエストからすべてのデバイス情報を取得できます。その後、この情報を Redis に保存し、新しいトークンを生成して、そのトークンをユーザーに返します。
@app.post("/login") def login_route( form: Annotated[LoginForm, Depends()], request: Request ) -> JSONResponse: """ Handles a login request. Args: form (Annotated[LoginForm, Depends()]): The form data containing the username and password request (Request): The request containing the User-Agent header and client host Returns: JSONResponse: A JSON response containing a JWT token if the login is successful, otherwise a JSONResponse with a 404 status code and a message indicating that the username or password is invalid """ username = form.username password = form.password # Authenticate the user user = authenticate_user(username, password) if user is None: return JSONResponse( status_code=404, content={"message": "Invalid username or password"} ) # Create a new session session = Session( device_name=request.headers.get("User-Agent"), ip_address=request.client.host ) # Get the user from the cache user_from_cache = get_user_from_cache(username) if user_from_cache is None: return JSONResponse(content={"message": "one minute"}, status_code=404) # Get the user's sessions user_sessions = get_sessions(userid=username) # Add the new session to the user's sessions try: user_sessions.append(session) except AttributeError: user_sessions = [session] # Update the user in the cache user_from_cache.sessions = user_sessions update_user_cache(userid=username, new_data=user_from_cache) # Create a JWT token token = create_token(Token(user=username, session_id=session.session_id)) # Return the JWT token return JSONResponse(content={"message": "logged in", "token": token})
これは簡単な部分です。ユーザーがアプリにリクエストを送信するたびに、Bearer トークンをデコードしてsession_id
とusername
を取得します。その後、 username
使用して Redis にクエリを実行できます。
一致が見つかった場合、デコードされたトークンからsession_id
に関連付けられたセッションを削除します。たとえば、セッションが存在しない場合は、ユーザーにメッセージを返すだけです。これは、ユーザーが別のデバイスからそのデバイスから既にログアウトしているか、トークンが無効であることを示します。
@app.post("/logout") def logout_route(request: Request): """ Handles a request to log out the user. This endpoint will delete the user's session from the cache and return a JSON response with a message indicating that the user has been logged out. Args: request (Request): The request containing the Authorization header with the JWT token Returns: JSONResponse: A JSON response containing the message "logged out" if the token is valid, otherwise a JSONResponse with a 404 status code and a message indicating that the token is invalid """ # Get the JWT token from the Authorization header _, token = get_authorization_scheme_param(request.headers.get("Authorization")) # Decode the JWT token payload = decode_token(token) # Check if the token is invalid if payload is None: return JSONResponse(content={"message": "Invalid token"}, status_code=404) # Check if the user or session does not exist if get_single_session(userid=payload.user, session_id=payload.session_id) is None or get_user_from_cache( userid=payload.user) is None: return JSONResponse(content={"message": "Invalid token"}, status_code=404) # Delete the session from the cache delete_session(payload.user, payload.session_id) # Return a JSON response with a message indicating that the user has been logged out return JSONResponse(content={"message": "logged out"})
そうですね、それほど難しくなかったですね。このプロジェクトはここ数週間頭にありました。テストしたかったのです。このシステムは完全に完璧というわけではありませんが (つまり、欠陥のないシステムなどありません)、明らかに改善できます。たとえば、Curl やコンソール アプリ、さらには Postman などの場所からのリクエストをどのように管理するのでしょうか。これらのソースからの複数のリクエストは多くのセッションにつながり、その結果、データベースに不要なデータが取り込まれることになります。はい、リクエストの送信元を確認して、それを処理するロジックを作成することもできますが、正直に言うと、それは大変な作業です。そのため、本物の「agba」(シニア エンジニア) でない限り、本番アプリ用の認可および認証システムを構築することはお勧めしません。OAuth 2.0 (Google または Apple) やKindeやAuth0などの外部プロバイダーを使用することをお勧めします。また、私のようにお金がなくEdgeDB を使用している場合は、すぐに使用できる認証システムが付属しています。こうすることで、何かが起こった場合に、インターン生だけではなく他の誰かを責めることができます 🙂。