kopia lustrzana https://github.com/micropython/micropython-lib
Porównaj commity
3 Commity
17fd5eeb4e
...
6a6692003f
Autor | SHA1 | Data |
---|---|---|
Rahul Ravikumar | 6a6692003f | |
Damien George | 45ead11f96 | |
Rahul Ravikumar | 6f70eb5c84 |
|
@ -1,3 +1,3 @@
|
|||
metadata(version="0.2.0")
|
||||
metadata(version="0.2.1")
|
||||
|
||||
module("ssl.py", opt=3)
|
||||
|
|
|
@ -1,12 +1,5 @@
|
|||
import tls
|
||||
from tls import (
|
||||
CERT_NONE,
|
||||
CERT_OPTIONAL,
|
||||
CERT_REQUIRED,
|
||||
MBEDTLS_VERSION,
|
||||
PROTOCOL_TLS_CLIENT,
|
||||
PROTOCOL_TLS_SERVER,
|
||||
)
|
||||
from tls import *
|
||||
|
||||
|
||||
class SSLContext:
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
from uoauth2.device import DeviceAuth
|
||||
|
||||
# For more information on how to create clients
|
||||
# Look at: https://developers.google.com/identity/protocols/oauth2/limited-input-device
|
||||
|
||||
device_auth = DeviceAuth(
|
||||
client_id='648445354032-mv5p4b09hcj0116v57pnkmp42fn8m220.apps.googleusercontent.com',
|
||||
client_secret='9aeN3LGr0yq4TYjwGcfUVJKo',
|
||||
discovery_endpoint='https://accounts.google.com/.well-known/openid-configuration',
|
||||
scopes=list(['openid'])
|
||||
)
|
||||
|
||||
# Discover OpenID endpoints
|
||||
device_auth.discover()
|
||||
|
||||
# Start authorization process
|
||||
device_auth.authorize()
|
||||
|
||||
# Use the user-code and verification URL to show some UI to the user
|
||||
# To complete the authorization process.
|
||||
user_code = device_auth.user_code
|
||||
verification_url = device_auth.verification_url
|
||||
|
||||
print(user_code, verification_url)
|
||||
|
||||
# Check for completed authorization
|
||||
device_auth.check_authorization_complete()
|
||||
|
||||
# Fetch a valid access token
|
||||
print(device_auth.token())
|
|
@ -0,0 +1,4 @@
|
|||
srctype = micropython-lib
|
||||
type = module
|
||||
version = 0.1
|
||||
author = Rahul Ravikumar
|
|
@ -0,0 +1,20 @@
|
|||
import sys
|
||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
||||
# module instead of system's.
|
||||
sys.path.pop(0)
|
||||
from setuptools import setup
|
||||
sys.path.append("..")
|
||||
import sdist_upip
|
||||
|
||||
setup(name='micropython-uoauth2.device',
|
||||
version='0.1',
|
||||
description='uoauth2.device module for MicroPython',
|
||||
long_description="This is a module reimplemented specifically for MicroPython standard library,\nwith efficient and lean design in mind. Note that this module is likely work\nin progress and likely supports just a subset of CPython's corresponding\nmodule. Please help with the development if you are interested in this\nmodule.",
|
||||
url='https://github.com/micropython/micropython-lib',
|
||||
author='Rahul Ravikumar',
|
||||
author_email='micro-python@googlegroups.com',
|
||||
maintainer='micropython-lib Developers',
|
||||
maintainer_email='micro-python@googlegroups.com',
|
||||
license='MIT',
|
||||
cmdclass={'sdist': sdist_upip.sdist},
|
||||
py_modules=['uoauth2'])
|
|
@ -0,0 +1,317 @@
|
|||
import json
|
||||
import os
|
||||
import time
|
||||
import urllib.parse as urlparse
|
||||
|
||||
import urequests as requests
|
||||
|
||||
|
||||
def _exists(path):
|
||||
'''
|
||||
Return True if the path exists.
|
||||
'''
|
||||
|
||||
try:
|
||||
os.stat(path)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
class DeviceAuth:
|
||||
'''
|
||||
Helps with authenticating devices with limited input capabilities
|
||||
per the OAuth2 device flow specification.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
client_id,
|
||||
client_secret,
|
||||
discovery_endpoint,
|
||||
scopes=list(),
|
||||
saved_location=None
|
||||
):
|
||||
self.client_id = client_id
|
||||
self.client_secret = client_secret
|
||||
self.discovery_endpoint = discovery_endpoint
|
||||
self.scopes = scopes
|
||||
self.saved_location = saved_location
|
||||
|
||||
self.user_code = None
|
||||
self.verification_url = None
|
||||
|
||||
self._discovered = False
|
||||
self._authorization_started = False
|
||||
self._authorization_completed = False
|
||||
|
||||
self._device_auth_endpoint = None
|
||||
self._token_endpoint = None
|
||||
self._device_code = None
|
||||
self._interval = None
|
||||
self._code_expires_in = None
|
||||
|
||||
self._access_token = None
|
||||
self._token_acquired_at = None
|
||||
self._token_expires_in = None
|
||||
self._token_scope = None
|
||||
self._token_type = None
|
||||
self._refresh_token = None
|
||||
|
||||
def discover(self):
|
||||
'''
|
||||
Performs OAuth2 device endpoint discovery.
|
||||
'''
|
||||
|
||||
if not self._discovered:
|
||||
r = requests.request('GET', self.discovery_endpoint)
|
||||
j = r.json()
|
||||
self._device_auth_endpoint = j['device_authorization_endpoint']
|
||||
self._token_endpoint = j['token_endpoint']
|
||||
self._discovered = True
|
||||
r.close()
|
||||
|
||||
saved = self.save()
|
||||
if not saved:
|
||||
print('Unable to save auth state.')
|
||||
|
||||
def authorize(self):
|
||||
'''
|
||||
Makes an authorization request.
|
||||
'''
|
||||
|
||||
if not self._discovered:
|
||||
print('Need to discover authorization and token endpoints.')
|
||||
return
|
||||
|
||||
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
|
||||
payload = {
|
||||
'client_id': self.client_id,
|
||||
'scope': ' '.join(self.scopes)
|
||||
}
|
||||
encoded = urlparse.urlencode(payload)
|
||||
r = requests.request(
|
||||
'POST',
|
||||
self._device_auth_endpoint,
|
||||
data=encoded,
|
||||
headers=headers
|
||||
)
|
||||
j = r.json()
|
||||
r.close()
|
||||
|
||||
if 'error' in j:
|
||||
raise RuntimeError(j['error'])
|
||||
|
||||
self._device_code = j['device_code']
|
||||
self.user_code = j['user_code']
|
||||
self.verification_url = j['verification_url']
|
||||
self._interval = j['interval']
|
||||
self._code_expires_in = j['expires_in']
|
||||
self._authorization_started = True
|
||||
message = 'Use code %s at %s to authorize the device.' % (
|
||||
self.user_code,
|
||||
self.verification_url
|
||||
)
|
||||
print(message)
|
||||
|
||||
def check_authorization_complete(self, sleep_duration_seconds=5, max_attempts=10):
|
||||
'''
|
||||
Polls until completion of an authorization request.
|
||||
'''
|
||||
|
||||
if not self._authorization_started:
|
||||
print('Start an authorization request.')
|
||||
return
|
||||
|
||||
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
|
||||
payload = {
|
||||
'client_id': self.client_id,
|
||||
'client_secret': self.client_secret,
|
||||
'device_code': self._device_code,
|
||||
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code'
|
||||
}
|
||||
encoded = urlparse.urlencode(payload)
|
||||
|
||||
current_attempt = 0
|
||||
while not self.authorized and current_attempt < max_attempts:
|
||||
current_attempt = current_attempt + 1
|
||||
r = requests.request(
|
||||
'POST',
|
||||
self._token_endpoint,
|
||||
data=encoded,
|
||||
headers=headers
|
||||
)
|
||||
j = r.json()
|
||||
r.close()
|
||||
if 'error' in j:
|
||||
if j['error'] == 'authorization_pending':
|
||||
print('Pending authorization. ')
|
||||
time.sleep(sleep_duration_seconds)
|
||||
elif j['error'] == 'access_denied':
|
||||
print('Access denied')
|
||||
raise RuntimeError(j['error'])
|
||||
else:
|
||||
self._access_token = j['access_token']
|
||||
self._token_acquired_at = int(time.time())
|
||||
self._token_expires_in = j['expires_in']
|
||||
self._token_scope = j['scope']
|
||||
self._token_type = j['token_type']
|
||||
self._refresh_token = j['refresh_token']
|
||||
print('Completed authorization')
|
||||
self._authorization_completed = True
|
||||
saved = self.save()
|
||||
if not saved:
|
||||
print('Unable to save auth state.')
|
||||
|
||||
@property
|
||||
def authorized(self):
|
||||
return self._authorization_completed
|
||||
|
||||
def token(self, force_refresh=False):
|
||||
'''
|
||||
Fetches a valid access token.
|
||||
'''
|
||||
|
||||
if not self._authorization_completed:
|
||||
print('Complete an authorization request')
|
||||
return
|
||||
|
||||
buffer = 10 * 60 * -1 # 10 min in seconds
|
||||
now = int(time.time())
|
||||
is_valid = now < (
|
||||
self._token_acquired_at +
|
||||
self._token_expires_in +
|
||||
buffer
|
||||
)
|
||||
if not is_valid or force_refresh:
|
||||
print('Token expired. Refreshing access tokens.')
|
||||
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
|
||||
payload = {
|
||||
'client_id': self.client_id,
|
||||
'client_secret': self.client_secret,
|
||||
'refresh_token': self._refresh_token,
|
||||
'grant_type': 'refresh_token'
|
||||
}
|
||||
encoded = urlparse.urlencode(payload)
|
||||
r = requests.request(
|
||||
'POST',
|
||||
self._token_endpoint,
|
||||
data=encoded,
|
||||
headers=headers
|
||||
)
|
||||
status_code = r.status_code
|
||||
j = r.json()
|
||||
r.close()
|
||||
|
||||
if status_code == 400:
|
||||
print('Unable to refresh tokens.')
|
||||
raise(RuntimeError('Unable to refresh tokens.'))
|
||||
|
||||
print('Updated access tokens.')
|
||||
self._access_token = j['access_token']
|
||||
self._token_acquired_at = int(time.time())
|
||||
self._token_expires_in = j['expires_in']
|
||||
self._token_scope = j['scope']
|
||||
self._token_type = j['token_type']
|
||||
|
||||
saved = self.save()
|
||||
if not saved:
|
||||
print('Unable to store auth state.')
|
||||
|
||||
return self._access_token
|
||||
|
||||
def save(self):
|
||||
'''
|
||||
Serializes the auth state to a JSON payload and saves it in `location`.
|
||||
'''
|
||||
|
||||
if not self.saved_location:
|
||||
return True
|
||||
|
||||
payload = {
|
||||
'client_id': self.client_id,
|
||||
'client_secret': self.client_secret,
|
||||
'discovery_endpoint': self.discovery_endpoint,
|
||||
'scopes': self.scopes
|
||||
}
|
||||
|
||||
if self.saved_location:
|
||||
payload['saved_location'] = self.saved_location
|
||||
|
||||
if self._discovered:
|
||||
payload['discovered'] = True
|
||||
payload['device_auth_endpoint'] = self._device_auth_endpoint
|
||||
payload['token_endpoint'] = self._token_endpoint
|
||||
|
||||
if self.authorized:
|
||||
payload['authorized'] = True
|
||||
payload['refresh_token'] = self._refresh_token
|
||||
payload['access_token'] = self._access_token
|
||||
payload['token_acquired_at'] = self._token_acquired_at
|
||||
payload['token_expires_in'] = self._token_expires_in
|
||||
|
||||
try:
|
||||
with open(self.saved_location, 'w') as handle:
|
||||
json.dump(payload, handle)
|
||||
print('Saved auth state.')
|
||||
|
||||
return True
|
||||
except OSError as error:
|
||||
print('Error saving authentication state.', error)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, location):
|
||||
'''
|
||||
Loads authentication state from a given location.
|
||||
'''
|
||||
if not _exists(location):
|
||||
print('No serialized state.')
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(location, 'r') as handle:
|
||||
payload = json.load(handle)
|
||||
client_id = payload['client_id']
|
||||
client_secret = payload['client_secret']
|
||||
discovery_endpoint = payload['discovery_endpoint']
|
||||
scopes = payload['scopes']
|
||||
device_auth = DeviceAuth(
|
||||
client_id=client_id,
|
||||
client_secret=client_secret,
|
||||
discovery_endpoint=discovery_endpoint,
|
||||
scopes=scopes
|
||||
)
|
||||
|
||||
if 'saved_location' in payload:
|
||||
saved_location = payload['saved_location']
|
||||
device_auth.saved_location = saved_location
|
||||
|
||||
if 'discovered' in payload:
|
||||
device_auth_endpoint = payload['device_auth_endpoint']
|
||||
token_endpoint = payload['token_endpoint']
|
||||
device_auth._discovered = True
|
||||
device_auth._device_auth_endpoint = device_auth_endpoint
|
||||
device_auth._token_endpoint = token_endpoint
|
||||
|
||||
if 'authorized' in payload:
|
||||
refresh_token = payload['refresh_token']
|
||||
access_token = payload['access_token']
|
||||
token_acquired_at = payload['token_acquired_at']
|
||||
token_expires_in = payload['token_expires_in']
|
||||
device_auth._authorization_completed = True
|
||||
device_auth._refresh_token = refresh_token
|
||||
device_auth._access_token = access_token
|
||||
device_auth._token_acquired_at = token_acquired_at
|
||||
device_auth._token_expires_in = token_expires_in
|
||||
|
||||
return device_auth
|
||||
except Exception as error:
|
||||
print('Unable to create an instance of DeviceAuth.', error)
|
||||
try:
|
||||
os.remove(location)
|
||||
except OSError as error:
|
||||
# Do nothing
|
||||
pass
|
||||
|
||||
return None
|
Ładowanie…
Reference in New Issue