Coverage for services/google.py: 25%

83 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2024-04-30 13:30 -0600

1import json 

2from datetime import datetime 

3 

4import google.auth.transport.requests 

5from django.conf import settings 

6from django.utils import timezone 

7from google.oauth2.credentials import Credentials 

8from google_auth_oauthlib.flow import Flow 

9from googleapiclient.discovery import build 

10from oauth2client.client import GoogleCredentials 

11 

12from apps.users.models import GoogleAccount, User 

13 

14SCOPES = [ 

15 "openid", 

16 "https://www.googleapis.com/auth/userinfo.email", 

17 "https://www.googleapis.com/auth/userinfo.profile", 

18 "https://www.googleapis.com/auth/calendar", 

19 "https://www.googleapis.com/auth/calendar.events", 

20 "https://www.googleapis.com/auth/calendar.settings.readonly", 

21] 

22SECRETS_FILE_LOCATION = "./client_secret.json" 

23 

24 

25class GoogleService: 

26 def _refresh_tokens(self): 

27 print(">>>> _refresh_tokens") 

28 request = google.auth.transport.requests.Request() 

29 self.credentials.refresh(request) 

30 # guardar el new creds 

31 self.ga.credentials_json = self.credentials.to_json() 

32 self.ga.token = self.credentials.token 

33 self.ga.refresh_token = self.credentials.refresh_token 

34 self.ga.expires_at = self.credentials.expiry 

35 self.ga.save() 

36 

37 def _build_credentials_from_db(self, ga: GoogleAccount): 

38 print(">>>> _build_credentials_from_db") 

39 credentials_json = ga.credentials_json 

40 credentials_dict = json.loads(credentials_json) 

41 credentials_dict["expiry"] = datetime.strptime( 

42 json.loads(credentials_json).get("expiry"), "%Y-%m-%dT%H:%M:%S.%fZ" 

43 ) 

44 self.credentials = Credentials(**credentials_dict) 

45 

46 if self.credentials.expired: 

47 print("expiraron las credenciales, las refresheare") 

48 self._refresh_tokens() 

49 

50 return self.credentials 

51 

52 def _get_credentials_for_user(self, user: User): 

53 print(">>>> _get_credentials_for_user") 

54 self.ga = getattr(user, "googleaccount", None) 

55 if self.ga: 

56 print("credenciales del usuario encontradas") 

57 self.credentials = self._build_credentials_from_db(ga=self.ga) 

58 return self.credentials 

59 else: 

60 print("credenciales de usuario NO ENCONTRADAS") 

61 return None 

62 # self._get_credentials_from_flow() 

63 

64 def _get_credentials_from_flow(self): 

65 if not hasattr(self, "flow"): 

66 self._create_flow() 

67 self.credentials = self.flow.credentials 

68 return self.credentials 

69 

70 def _get_flow_token(self, code: str): 

71 print(">>>> _get_flow_token") 

72 validate = self.flow.fetch_token(code=code) 

73 return validate 

74 

75 def _get_auth_url_and_state(self): 

76 if not getattr(self, "flow", None): 

77 self._create_flow() 

78 try: 

79 auth_url, state = self.flow.authorization_url(prompt="consent") 

80 self.auth_url = auth_url 

81 self.state = state 

82 return auth_url, state 

83 except Exception as e: 

84 print(e) 

85 return None 

86 

87 def _get_flow_by_state(self, state: str): 

88 try: 

89 flow = Flow.from_client_secrets_file( 

90 SECRETS_FILE_LOCATION, 

91 scopes=SCOPES, 

92 redirect_uri=settings.GOOGLE_OAUTH_REDIRECT_URI, 

93 state=state, 

94 ) 

95 self.flow = flow 

96 return flow 

97 except Exception as e: 

98 print(e) 

99 return None 

100 

101 def _create_flow(self): 

102 try: 

103 flow = Flow.from_client_secrets_file( 

104 SECRETS_FILE_LOCATION, 

105 scopes=SCOPES, 

106 redirect_uri=settings.GOOGLE_OAUTH_REDIRECT_URI, 

107 ) 

108 self.flow = flow 

109 return flow 

110 except Exception as e: 

111 print(e) 

112 return None 

113 

114 def _build_service(self): 

115 print(">>>> _build_service") 

116 if self.credentials.expired: 

117 self._refresh_tokens() 

118 self.service = build("calendar", "v3", credentials=self.credentials) 

119 return self.service