signatures.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import hashlib
  2. from typing import Any
  3. from urllib.parse import urlparse
  4. import datetime
  5. from base64 import b64encode, b64decode
  6. from Crypto import Random
  7. from Crypto.PublicKey import RSA
  8. from Crypto.Signature import pkcs1_15 # pylint: disable=no-name-in-module
  9. from Crypto.Hash import SHA256
  10. MAX_SIGNATURE_AGE = 300
  11. def create_key_pair() -> tuple(str, str):
  12. """Creates new key pair for a new user"""
  13. random_generator = Random.new().read
  14. key = RSA.generate(1024, random_generator)
  15. private_key = key.export_key().decode("utf8")
  16. public_key = key.publickey().export_key().decode("utf8")
  17. return private_key, public_key
  18. def make_signature(sender, destination, date, digest):
  19. """Sign outgoing message with a private key"""
  20. inbox_parts = urlparse(destination)
  21. signature_headers = [
  22. f"(request-target): post {inbox_parts.path}",
  23. f"host: {inbox_parts.netloc}",
  24. f"date: {date}",
  25. f"digest: {digest}",
  26. ]
  27. message_to_sign = "\n".join(signature_headers)
  28. signer = pkcs1_15.new(RSA.import_key(sender.key_pair.private_key))
  29. signed_message = signer.sign(SHA256.new(message_to_sign.encode("utf8")))
  30. signature = {
  31. "keyId": f"{sender.remote_id}#main-key",
  32. "algorithm": "rsa-sha256",
  33. "headers": "(request-target) host date digest",
  34. "signature": b64encode(signed_message).decode("utf8"),
  35. }
  36. return ",".join(f'{k}="{v}"' for (k, v) in signature.items())
  37. def make_digest(data):
  38. """creates a message digest for signing"""
  39. return "SHA-256=" + b64encode(
  40. hashlib.sha256(data.encode("utf-8")).digest()
  41. ).decode("utf-8")
  42. def verify_digest(request):
  43. """checks if a digest is syntactically valid and matches the message"""
  44. algorithm, digest = request.headers["digest"].split("=", 1)
  45. if algorithm == "SHA-256":
  46. hash_function = hashlib.sha256
  47. elif algorithm == "SHA-512":
  48. hash_function = hashlib.sha512
  49. else:
  50. raise ValueError(f"Unsupported hash function: {algorithm}")
  51. expected = hash_function(request.body).digest()
  52. if b64decode(digest) != expected:
  53. raise ValueError("Invalid HTTP Digest header")
  54. class Signature:
  55. """read and validate incoming signatures"""
  56. def __init__(self, key_id, headers, signature):
  57. self.key_id = key_id
  58. self.headers = headers
  59. self.signature = signature
  60. # pylint: disable=invalid-name
  61. @classmethod
  62. def parse(cls, signature: str):
  63. """Extract and parse signature from an HTTP request signature string"""
  64. signature_dict = {}
  65. for pair in signature.split(","):
  66. k, v = pair.split("=", 1)
  67. v = v.replace('"', "")
  68. signature_dict[k] = v
  69. try:
  70. key_id = signature_dict["keyId"]
  71. headers = signature_dict["headers"]
  72. signature = b64decode(signature_dict["signature"])
  73. except KeyError:
  74. raise ValueError("Invalid auth header")
  75. return cls(key_id, headers, signature)
  76. def verify(self, public_key, date, request):
  77. """Verify RSA signature using a public key"""
  78. """verify rsa signature"""
  79. if http_date_age(request.headers["date"]) > MAX_SIGNATURE_AGE:
  80. raise ValueError(f"Request too old: {request.headers['date']}")
  81. public_key = RSA.import_key(public_key)
  82. comparison_string = []
  83. for signed_header_name in self.headers.split(" "):
  84. if signed_header_name == "(request-target)":
  85. comparison_string.append(
  86. f"(request-target): post {request.path}"
  87. )
  88. else:
  89. if signed_header_name == "digest":
  90. verify_digest(request)
  91. comparison_string.append(
  92. f"{signed_header_name}: {request.headers[signed_header_name]}"
  93. )
  94. comparison_string = "\n".join(comparison_string)
  95. signer = pkcs1_15.new(public_key)
  96. digest = SHA256.new()
  97. digest.update(comparison_string.encode())
  98. # raises a ValueError if it fails
  99. signer.verify(digest, self.signature)
  100. def http_date_age(datestr):
  101. """age of a signature in seconds"""
  102. parsed = datetime.datetime.strptime(datestr, "%a, %d %b %Y %H:%M:%S GMT")
  103. delta = datetime.datetime.utcnow() - parsed
  104. return delta.total_seconds()