Coverage for services/google.py: 25%
83 statements
« prev ^ index » next coverage.py v6.4.4, created at 2024-04-30 13:30 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2024-04-30 13:30 -0600
1import json
2from datetime import datetime
4import google.auth.transport.requests
5from django.conf import settings
6from django.utils import timezone
7from google.oauth2.credentials import Credentials
8from google_auth_oauthlib.flow import Flow
9from googleapiclient.discovery import build
10from oauth2client.client import GoogleCredentials
12from apps.users.models import GoogleAccount, User
14SCOPES = [
15 "openid",
16 "https://www.googleapis.com/auth/userinfo.email",
17 "https://www.googleapis.com/auth/userinfo.profile",
18 "https://www.googleapis.com/auth/calendar",
19 "https://www.googleapis.com/auth/calendar.events",
20 "https://www.googleapis.com/auth/calendar.settings.readonly",
21]
22SECRETS_FILE_LOCATION = "./client_secret.json"
25class GoogleService:
26 def _refresh_tokens(self):
27 print(">>>> _refresh_tokens")
28 request = google.auth.transport.requests.Request()
29 self.credentials.refresh(request)
30 # guardar el new creds
31 self.ga.credentials_json = self.credentials.to_json()
32 self.ga.token = self.credentials.token
33 self.ga.refresh_token = self.credentials.refresh_token
34 self.ga.expires_at = self.credentials.expiry
35 self.ga.save()
37 def _build_credentials_from_db(self, ga: GoogleAccount):
38 print(">>>> _build_credentials_from_db")
39 credentials_json = ga.credentials_json
40 credentials_dict = json.loads(credentials_json)
41 credentials_dict["expiry"] = datetime.strptime(
42 json.loads(credentials_json).get("expiry"), "%Y-%m-%dT%H:%M:%S.%fZ"
43 )
44 self.credentials = Credentials(**credentials_dict)
46 if self.credentials.expired:
47 print("expiraron las credenciales, las refresheare")
48 self._refresh_tokens()
50 return self.credentials
52 def _get_credentials_for_user(self, user: User):
53 print(">>>> _get_credentials_for_user")
54 self.ga = getattr(user, "googleaccount", None)
55 if self.ga:
56 print("credenciales del usuario encontradas")
57 self.credentials = self._build_credentials_from_db(ga=self.ga)
58 return self.credentials
59 else:
60 print("credenciales de usuario NO ENCONTRADAS")
61 return None
62 # self._get_credentials_from_flow()
64 def _get_credentials_from_flow(self):
65 if not hasattr(self, "flow"):
66 self._create_flow()
67 self.credentials = self.flow.credentials
68 return self.credentials
70 def _get_flow_token(self, code: str):
71 print(">>>> _get_flow_token")
72 validate = self.flow.fetch_token(code=code)
73 return validate
75 def _get_auth_url_and_state(self):
76 if not getattr(self, "flow", None):
77 self._create_flow()
78 try:
79 auth_url, state = self.flow.authorization_url(prompt="consent")
80 self.auth_url = auth_url
81 self.state = state
82 return auth_url, state
83 except Exception as e:
84 print(e)
85 return None
87 def _get_flow_by_state(self, state: str):
88 try:
89 flow = Flow.from_client_secrets_file(
90 SECRETS_FILE_LOCATION,
91 scopes=SCOPES,
92 redirect_uri=settings.GOOGLE_OAUTH_REDIRECT_URI,
93 state=state,
94 )
95 self.flow = flow
96 return flow
97 except Exception as e:
98 print(e)
99 return None
101 def _create_flow(self):
102 try:
103 flow = Flow.from_client_secrets_file(
104 SECRETS_FILE_LOCATION,
105 scopes=SCOPES,
106 redirect_uri=settings.GOOGLE_OAUTH_REDIRECT_URI,
107 )
108 self.flow = flow
109 return flow
110 except Exception as e:
111 print(e)
112 return None
114 def _build_service(self):
115 print(">>>> _build_service")
116 if self.credentials.expired:
117 self._refresh_tokens()
118 self.service = build("calendar", "v3", credentials=self.credentials)
119 return self.service