123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- from django.http import Http404
- from activitypub.signatures import Signature
- import json
- import logging
- import re
- from django.core.exceptions import BadRequest, PermissionDenied
- from django.utils.decorators import method_decorator
- from django.views import View
- from django.views.decorators.csrf import csrf_exempt
- from activitypub.exceptions import (
- BannedOrDeletedActorException,
- BlockedActorException,
- BlockedUserAgentException,
- )
- from activitypub.models import FederatedServer
- logger = logging.getLogger(__name__)
- @method_decorator(csrf_exempt, name="dispatch")
- # pylint: disable=no-self-use
- class InboxView(View):
- """requests sent by outside servers"""
- def post(self, request, username=None):
- """InboxView handles requests sent from outside our instance"""
- self.check_is_blocked_user_agent()
- # make sure the user's inbox even exists
- if username:
- get_object_or_404(User, localname=username, is_active=True)
- # is it valid json? does it at least vaguely resemble an activity?
- try:
- activity_json = json.loads(request.body)
- except json.decoder.JSONDecodeError:
- raise BadRequest()
- # let's be extra sure we didn't block this domain
- self.check_is_blocked_activity(activity_json)
- if (
- not "object" in activity_json
- or not "type" in activity_json
- or not activity_json["type"] in activitypub.activity_objects
- ):
- raise Http404()
- # verify the signature
- if not has_valid_signature(request, activity_json):
- if activity_json["type"] == "Delete":
- # Pretend that unauth'd deletes succeed. Auth may be failing
- # because the resource or owner of the resource might have
- # been deleted.
- return HttpResponse()
- return HttpResponse(status=401)
- activity_task.delay(activity_json)
- return HttpResponse()
- def check_is_blocked_user_agent(self) -> None:
- """Raise an exception if a request is from a blocked server based on user agent"""
- user_agent = self.request.headers.get("User-Agent")
- if not user_agent:
- return
- url = re.search(rf"https?://{regex.DOMAIN}/?", user_agent)
- if not url:
- return
- url = url.group()
- if FederatedServer.is_blocked(url):
- logger.debug(
- "%s is blocked, denying request based on user agent", url
- )
- raise BlockedUserAgentException
- def check_is_blocked_activity(self, activity_json: dict) -> None:
- """Raise an exception if actor of an activity is blocked"""
- actor = activity_json.get("actor")
- if not actor:
- return
- # TODO Remove User hard-code and add an AP Profile
- existing = User.find_existing_by_remote_id(actor)
- if existing:
- if existing.deleted:
- logger.debug("%s is banned/deleted", actor)
- raise BannedOrDeletedActorException
- if existing.banned:
- logger.debug("%s is banned/deleted", actor)
- raise BannedOrDeletedActorException
- if FederatedServer.is_blocked(actor):
- logger.debug("%s is blocked", actor)
- raise BlockedActorException
- def is_signature_valid(self, activity) -> bool:
- try:
- signature = Signature.parse(self.request.headers["Signature"])
- key_actor = urldefrag(signature.key_id).url
- if key_actor != activity.get("actor"):
- raise ValueError("Wrong actor created signature.")
- remote_user = resolve_remote_id(key_actor, model=User)
- if not remote_user:
- return False
- try:
- signature.verify(remote_user.key_pair.public_key, self.request)
- except ValueError:
- old_key = remote_user.key_pair.public_key
- remote_user = resolve_remote_id(
- remote_user.remote_id, model=models.User, refresh=True
- )
- if remote_user.key_pair.public_key == old_key:
- raise # Key unchanged.
- signature.verify(remote_user.key_pair.public_key, self.request)
- except (ValueError, requests.exceptions.HTTPError):
- return False
- return True
|