from Crypto.Cipher import AES from Crypto.Util.Padding import unpad, pad from base64 import b64decode, b64encode from app.core.config import settings import hashlib # KIS WebSocket Decryption (CBC) def aes_cbc_base64_dec(key: str, iv: str, cipher_text: str) -> str: """ Decrypts KIS WebSocket data using AES-256-CBC. adapted from KIS official sample. """ if not key or not iv: raise ValueError("Key and IV are required for decryption") # Key and IV are assumed to be utf-8 strings cipher = AES.new(key.encode("utf-8"), AES.MODE_CBC, iv.encode("utf-8")) # Decrypt and unpad decrypted_bytes = unpad(cipher.decrypt(b64decode(cipher_text)), AES.block_size) return bytes.decode(decrypted_bytes, 'utf-8') # DB Field Encryption (AES-GCM) def get_master_key(): # Derive a 32-byte key from the SECRET_KEY string, ensuring length return hashlib.sha256(settings.SECRET_KEY.encode('utf-8')).digest() def encrypt_str(plain_text: str) -> str: """ Encrypts string for DB storage using AES-GCM. Returns: base64(nonce + ciphertext + tag) """ if not plain_text: return "" key = get_master_key() cipher = AES.new(key, AES.MODE_GCM) nonce = cipher.nonce # 16 bytes ciphertext, tag = cipher.encrypt_and_digest(plain_text.encode('utf-8')) # Combined: nonce(16) + tag(16) + ciphertext(n) combined = nonce + tag + ciphertext return b64encode(combined).decode('utf-8') def decrypt_str(encrypted_text: str) -> str: """ Decrypts string from DB. Input: base64(nonce + tag + ciphertext) """ if not encrypted_text: return "" try: raw = b64decode(encrypted_text) if len(raw) < 32: # Nonce(16) + Tag(16) return "" # Invalid data nonce = raw[:16] tag = raw[16:32] ciphertext = raw[32:] key = get_master_key() cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) decrypted_data = cipher.decrypt_and_verify(ciphertext, tag) return decrypted_data.decode('utf-8') except Exception: # Failed to decrypt (possibly not encrypted or wrong key). # For safety, return empty or raise. # In transition phase, might check if plain text? No, assume encrypted. return "[Decryption Failed]"