74 lines
2.3 KiB
Python
74 lines
2.3 KiB
Python
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import unpad, pad
|
|
from base64 import b64decode, b64encode
|
|
from app.core.config import settings
|
|
import hashlib
|
|
|
|
# KIS WebSocket Decryption (CBC)
|
|
def aes_cbc_base64_dec(key: str, iv: str, cipher_text: str) -> str:
|
|
"""
|
|
Decrypts KIS WebSocket data using AES-256-CBC.
|
|
adapted from KIS official sample.
|
|
"""
|
|
if not key or not iv:
|
|
raise ValueError("Key and IV are required for decryption")
|
|
|
|
# Key and IV are assumed to be utf-8 strings
|
|
cipher = AES.new(key.encode("utf-8"), AES.MODE_CBC, iv.encode("utf-8"))
|
|
|
|
# Decrypt and unpad
|
|
decrypted_bytes = unpad(cipher.decrypt(b64decode(cipher_text)), AES.block_size)
|
|
|
|
return bytes.decode(decrypted_bytes, 'utf-8')
|
|
|
|
# DB Field Encryption (AES-GCM)
|
|
def get_master_key():
|
|
# Derive a 32-byte key from the SECRET_KEY string, ensuring length
|
|
return hashlib.sha256(settings.SECRET_KEY.encode('utf-8')).digest()
|
|
|
|
def encrypt_str(plain_text: str) -> str:
|
|
"""
|
|
Encrypts string for DB storage using AES-GCM.
|
|
Returns: base64(nonce + ciphertext + tag)
|
|
"""
|
|
if not plain_text:
|
|
return ""
|
|
|
|
key = get_master_key()
|
|
cipher = AES.new(key, AES.MODE_GCM)
|
|
nonce = cipher.nonce # 16 bytes
|
|
|
|
ciphertext, tag = cipher.encrypt_and_digest(plain_text.encode('utf-8'))
|
|
|
|
# Combined: nonce(16) + tag(16) + ciphertext(n)
|
|
combined = nonce + tag + ciphertext
|
|
return b64encode(combined).decode('utf-8')
|
|
|
|
def decrypt_str(encrypted_text: str) -> str:
|
|
"""
|
|
Decrypts string from DB.
|
|
Input: base64(nonce + tag + ciphertext)
|
|
"""
|
|
if not encrypted_text:
|
|
return ""
|
|
|
|
try:
|
|
raw = b64decode(encrypted_text)
|
|
if len(raw) < 32: # Nonce(16) + Tag(16)
|
|
return "" # Invalid data
|
|
|
|
nonce = raw[:16]
|
|
tag = raw[16:32]
|
|
ciphertext = raw[32:]
|
|
|
|
key = get_master_key()
|
|
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
|
|
|
|
decrypted_data = cipher.decrypt_and_verify(ciphertext, tag)
|
|
return decrypted_data.decode('utf-8')
|
|
except Exception:
|
|
# Failed to decrypt (possibly not encrypted or wrong key).
|
|
# For safety, return empty or raise.
|
|
# In transition phase, might check if plain text? No, assume encrypted.
|
|
return "[Decryption Failed]"
|