from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi import base64 import json from rest_framework_simplejwt.views import TokenObtainPairView from rest_framework_simplejwt.tokens import RefreshToken from django.views.decorators.http import require_http_methods from rest_framework.response import Response from rest_framework import status from rest_framework.permissions import AllowAny, IsAuthenticated from django.utils import timezone from django.contrib.auth import get_user_model from .models import MasterKey from .models import Conversation from django.http import StreamingHttpResponse from .models import Message from .serializers import ConversationSerializer, MessageSerializer from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response from .utils import add_swagger_summaries from django.views import View import ollama from asgiref.sync import sync_to_async from django.utils.decorators import method_decorator import asyncio from django.views.decorators.csrf import csrf_exempt User = get_user_model() @add_swagger_summaries class ConversationView(viewsets.ModelViewSet): queryset = Conversation.objects.all() serializer_class = ConversationSerializer permission_classes = [IsAuthenticated] # JWT token auth required def get_queryset(self): queryset = Conversation.objects.all() return queryset def perform_create(self, serializer): """Associate new product with current user""" serializer.save(user=self.request.user) @swagger_auto_schema( method='get', operation_description="Get all the messages of the conversation", operation_summary="Get all messages", responses={ 200: openapi.Response('List of messages', MessageSerializer(many=True)), 400: 'Bad Request', 404: 'Conversation not found' }, manual_parameters=[ openapi.Parameter( 'category', openapi.IN_QUERY, description="Filter featured items by category", type=openapi.TYPE_STRING ) ] ) @action(detail=True, methods=['get']) def contents(self, request, pk=None): conversation = self.get_object() messages = conversation.messages.all() return Response(data=list(messages.values())) @method_decorator(csrf_exempt, name='dispatch') class ConversationActions(View): @swagger_auto_schema( operation_description="Discutes with the ai", operation_summary="Make a new prompt", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'content': openapi.Schema(type=openapi.TYPE_STRING, description='Contents of the message'), }, required=['content'] ), responses={ 200: openapi.Response('Message processed successfully', MessageSerializer), 400: 'Bad Request', } ) @sync_to_async def post(self, request, *args, **kwargs): conversation = Conversation.objects.get(pk=self.kwargs['id']) data = json.loads(request.body) messages = []#{ # "role": "system", # "content": """ # You must strictly refuse to engage with ANY of the following categories: # 1. Violence or harm (even fictional or hypothetical scenarios) # 2. ANY explicit, suggestive, or romantic content # 3. Controversial political topics # 4. ANY content that could potentially be illegal # 5. Medical, legal, or financial advice # 6. Personal information or privacy violations # 7. Anything that could be remotely offensive to anyone # If you detect such content, immediately respond with: # "I cannot assist with that request as it appears to be inappropriate. I'm designed to be helpful, but within strict ethical boundaries. Is there something else I can help you with?" # """ # }] for message in conversation.messages.all(): if message: messages.append({ "role": message.role, "content": message.content }) messages.append({ "role": "user", "content": data['content'] or "" }) Message(role="user", content=data['content'] or "", conversation=conversation).save() ai_message = Message(role="assistant", content="", conversation=conversation) @sync_to_async def save_message(): ai_message.save() async def chat_event_stream(): message = "" try: stream = ollama.chat(model="llama2-uncensored", messages=messages, stream=True) for chunk in stream: message = chunk['message']['content'] #print(message, base64.b64encode(message.encode("utf-8"))) ai_message.content += message yield f"{base64.b64encode(message.encode("utf-8")).decode("utf-8")}" finally: await save_message() response = StreamingHttpResponse(chat_event_stream(), content_type='text/event-stream') response['Cache-Control'] = 'no-cache' response['X-Accel-Buffering'] = 'no' return response class MasterKeyTokenObtainView(TokenObtainPairView): permission_classes = [AllowAny] @swagger_auto_schema( operation_description="Creates a token for a user (creates the user if doesn't exists)", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'username': openapi.Schema(type=openapi.TYPE_STRING, description='Identifier of the user'), 'master_key': openapi.Schema(type=openapi.TYPE_STRING, description='Key of the authorizied application'), }, required=['username', 'master_key'] ), responses={ 200: openapi.Response('API tokens of the user', openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'access': openapi.Schema(type=openapi.TYPE_STRING, description='API access token of the user'), 'refresh': openapi.Schema(type=openapi.TYPE_STRING, description='API refresh token of the user'), }, required=['access', 'access'] )), 400: 'Bad Request', 401: 'Bad master key' } ) def post(self, request, *args, **kwargs): # Get the provided master key from the request master_key_value = request.data.get('master_key') if not master_key_value: return Response( {'error': 'Master key is required'}, status=status.HTTP_400_BAD_REQUEST ) # Verify the master key locally try: master_key = MasterKey.objects.get(key_value=master_key_value, is_active=True) except MasterKey.DoesNotExist: return Response( {'error': 'Invalid or inactive master key'}, status=status.HTTP_401_UNAUTHORIZED ) # Update last used timestamp master_key.last_used = timezone.now() master_key.save(update_fields=['last_used']) # Get user identifier from request or use a default user_identifier = request.data.get('username', f'service_user_{master_key.key_id}') # Get or create a user associated with this master key user, created = User.objects.get_or_create( username=user_identifier, defaults={ 'is_active': True } ) # Generate tokens manually refresh = RefreshToken.for_user(user) # Add custom claims from the master key refresh['key_id'] = str(master_key.key_id) refresh['permissions'] = master_key.permissions return Response({ 'refresh': str(refresh), 'access': str(refresh.access_token), })