Generative AI

A Coding Guide to Building an Ai-Powered Agent System with hybrid encryption, digital connectivity and dynamic security intelligence

In this tutorial, we build an ai-Powered Aged agent system that combines the power of classical encryption with adaptive intelligence. We design agents that can perform hybrid encryption with RSA and AES, generate digital signatures, detect anomalies in message patterns, and recommend critical rotations. As we progress, we witness these private individuals securely establish communication channels, exchange encrypted information, and continuously assess security risks in real time, all working well. Look Full codes here.

import hashlib, hmac, json, time, secrets, numpy as np
from dataclasses import dataclass
from typing import Dict, List
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend


@dataclass
class SecurityEvent:
   timestamp: float
   event_type: str
   risk_score: float
   details: Dict

We start by importing all the necessary libraries for cryptography, AI-based analysis, and data handling. We also define a security database to record and analyze all important events in the cryptographic system. Look Full codes here.

class CryptoAgent:
   def __init__(self, agent_id: str):
       self.agent_id = agent_id
       self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
       self.public_key = self.private_key.public_key()
       self.session_keys = {}
       self.security_events = []
       self.encryption_count = 0
       self.key_rotation_threshold = 100


   def get_public_key_bytes(self) -> bytes:
       return self.public_key.public_bytes(
           encoding=serialization.Encoding.PEM,
           format=serialization.PublicFormat.SubjectPublicKeyInfo
       )


   def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
       session_key = secrets.token_bytes(32)
       self.session_keys[partner_id] = session_key
       partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
       encrypted_session_key = partner_public_key.encrypt(
           session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.log_security_event("SESSION_ESTABLISHED", 0.1, {"partner": partner_id})
       return encrypted_session_key


   def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
       session_key = self.private_key.decrypt(
           encrypted_session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.session_keys[partner_id] = session_key

We define the cryptoagent class and initialize its keys, session storage, and security tracking system. We then implement mechanisms that allow agents to generate and exchange RSA public keys and establish secure hybrid encrypted sessions. Look Full codes here.

 def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
       if partner_id not in self.session_keys:
           raise ValueError(f"No session established with {partner_id}")
       self.encryption_count += 1
       if self.encryption_count >= self.key_rotation_threshold:
           self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"count": self.encryption_count})
       iv = secrets.token_bytes(12)
       cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
       encryptor = cipher.encryptor()
       ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
       message_data = iv + ciphertext + encryptor.tag
       signature = self.sign_data(message_data)
       risk_score = self.analyze_encryption_pattern(len(plaintext))
       return {
           "sender": self.agent_id,
           "recipient": partner_id,
           "iv": iv.hex(),
           "ciphertext": ciphertext.hex(),
           "tag": encryptor.tag.hex(),
           "signature": signature.hex(),
           "timestamp": time.time(),
           "risk_score": risk_score
       }


   def decrypt_message(self, encrypted_msg: Dict) -> str:
       sender_id = encrypted_msg["sender"]
       if sender_id not in self.session_keys:
           raise ValueError(f"No session established with {sender_id}")
       iv = bytes.fromhex(encrypted_msg["iv"])
       ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
       tag = bytes.fromhex(encrypted_msg["tag"])
       cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
       decryptor = cipher.decryptor()
       plaintext = decryptor.update(ciphertext) + decryptor.finalize()
       if encrypted_msg.get("risk_score", 0) > 0.7:
           self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
       return plaintext.decode()

We use encryption and decryption logic that allows agents to communicate securely using AES-GCM. Each message is encrypted, signed, compromised, and safely downloaded by the recipient, maintaining authenticity and privacy. Look Full codes here.

def sign_data(self, data: bytes) -> bytes:
       return self.private_key.sign(
           data,
           padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
           hashes.SHA256()
       )


   def analyze_encryption_pattern(self, message_length: int) -> float:
       recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
       avg_risk = np.mean([e.risk_score for e in recent_events]) if recent_events else 0.1
       risk_score = 0.1
       if message_length > 10000:
           risk_score += 0.3
       if self.encryption_count % 50 == 0 and self.encryption_count > 0:
           risk_score += 0.2
       risk_score = (risk_score + avg_risk) / 2
       self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
       return min(risk_score, 1.0)


   def log_security_event(self, event_type: str, risk_score: float, details: Dict):
       event = SecurityEvent(timestamp=time.time(), event_type=event_type, risk_score=risk_score, details=details)
       self.security_events.append(event)


   def generate_security_report(self) -> Dict:
       if not self.security_events:
           return {"status": "No events recorded"}
       total_events = len(self.security_events)
       high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
       avg_risk = np.mean([e.risk_score for e in self.security_events])
       event_types = {}
       for event in self.security_events:
           event_types[event.event_type] = event_types.get(event.event_type, 0) + 1
       return {
           "agent_id": self.agent_id,
           "total_events": total_events,
           "high_risk_events": len(high_risk_events),
           "average_risk_score": round(avg_risk, 3),
           "encryption_count": self.encryption_count,
           "key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
           "event_breakdown": event_types,
           "security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
       }

We include advanced AI analytics components that analyze hidden behavior, detect anomalies, and log vulnerability events. This nippet gives our Agent adaptive intelligence, the ability to detect unusual patterns, and the ability to generate a comprehensive security report. Look Full codes here.

def demo_crypto_agent_system():
   print("🔐 Advanced Cryptographic Agent System Demon")
   print("=" * 60)
   alice = CryptoAgent("Alice")
   bob = CryptoAgent("Bob")
   print("n1. Agents Created")
   print(f"   Alice ID: {alice.agent_id}")
   print(f"   Bob ID: {bob.agent_id}")
   print("n2. Establishing Secure Session (Hybrid Encryption)")
   alice_public_key = alice.get_public_key_bytes()
   bob_public_key = bob.get_public_key_bytes()
   encrypted_session_key = alice.establish_session("Bob", bob_public_key)
   bob.receive_session_key("Alice", encrypted_session_key)
   print(f"   ✓ Session established with {len(encrypted_session_key)} byte encrypted key")
   print("n3. Encrypting and Transmitting Messages")
   messages = [
       "Hello Bob! This is a secure message.",
       "The launch codes are: Alpha-7-Charlie-9",
       "Meeting at 3 PM tomorrow.",
       "This is a very long message " * 100
   ]
   for i, msg in enumerate(messages, 1):
       encrypted = alice.encrypt_message("Bob", msg)
       print(f"n   Message {i}:")
       print(f"   - Plaintext length: {len(msg)} chars")
       print(f"   - Ciphertext: {encrypted['ciphertext'][:60]}...")
       print(f"   - Risk Score: {encrypted['risk_score']:.3f}")
       print(f"   - Signature: {encrypted['signature'][:40]}...")
       decrypted = bob.decrypt_message(encrypted)
       print(f"   - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
       print(f"   - Verification: {'✓ SUCCESS' if decrypted == msg else '✗ FAILED'}")
   print("n4. AI-Powered Security Analysis")
   print("n   Alice's Security Report:")
   alice_report = alice.generate_security_report()
   for k, v in alice_report.items(): print(f"   - {k}: {v}")
   print("n   Bob's Security Report:")
   bob_report = bob.generate_security_report()
   for k, v in bob_report.items(): print(f"   - {k}: {v}")
   print("n" + "=" * 60)
   print("Demo Complete! Key Features Demonstrated:")
   print("✓ Hybrid encryption (RSA + AES-GCM)")
   print("✓ Digital signatures for authentication")
   print("✓ AI-powered anomaly detection")
   print("✓ Intelligent key rotation recommendations")
   print("✓ Real-time security monitoring")


if __name__ == "__main__":
   demo_crypto_agent_system()

We show that a full cryptographic flow where two agents exchange messages securely, see detailed security reports. We conclude the demo with an overview of all the smart features the system is working on.

In conclusion, we show how artificial intelligence can enhance traditional cryptography by introducing contextual and contextual awareness. We not only encrypt and secure messages but also enable our agents to learn from communication behavior and optimize security measures. In the end, we see how AI-driven analysis combined with hybrid encryption provides the power to strengthen the new generation of self-monitoring, cryptographic monitoring.


Look Full codes here. Feel free to take a look at ours GitHub page for tutorials, code and notebooks. Also, feel free to follow us Kind of stubborn and don't forget to join ours 100K + ML Subreddit and sign up Our newsletter. Wait! Do you telegraph? Now you can join us by telegraph.


AsifAzzaq is the CEO of MarktechPost Media Inc.. as a visionary entrepreneur and developer, Asifi is committed to harnessing the power of social intelligence for good. His latest effort is the launch of a media intelligence platform, MarktechPpost, which stands out for its deep understanding of machine learning and deep learning stories that are technically sound and easily understood by a wide audience. The platform sticks to more than two million monthly views, which shows its popularity among the audience.

Follow Marktechpost: Add us as a favorite source on Google.

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button