Tweepy with Django-Allauth
I have spent a good portion of this year developing Twitter software. Some of it is for research, account maintenance, and retweeting. Now, I am ready to put some of what I developed online, for public use.
I had been learning the Flask web framework, but it seemed inadequate for what I
wanted to do–namely keep user data private (including API keys). For the last month
or so, I have been banging-out Django code, mostly from tutorials. In fact, it prompted
me to write a utility script for customizing Django’s settings.py
file to my
preferences (see below).
Anyway, after much research and reading of code, I have grown to like the Django-Allauth package. It handles the cration of user accounts and managing sessions quite nicely, and should I decide to deploy APIs on any of other social networks that it handles, I suspect integrating them will be quite similar.
Once I figured-out what it was that I needed to do to get what I want, it was relatively
simple, as can be seen in the the utils.py
file that I added to the Django-Allauth
Tutorial. And giving credit where due, I borrowed some of the code from
the DjangoTweepy repository on GitHub.
Here is the code:
# utils.py - Saturday, December 28, 2019 | |
# -*- coding: utf-8 -*- | |
""" I have been experimenting to see which web framework would make the better | |
wrapper for Tweepy. I decided on Django, with the Allauth package. | |
Afer going through the Django-Allauth Tutorial at | |
https://wsvincent.com/django-allauth-tutorial/ and playing with the DjangoTweepy repository at | |
https://github.com/martinjc/DjangoTweepy/blob/master/src/twitter_auth/utils.py, | |
this is what I contrived. | |
USAGE: | |
from . utils import get_api | |
... | |
api = get_api(request) | |
me = api.me() | |
screen_name = me.screen_name | |
whoami = api.get_user(screen_name) | |
print(whoami) | |
""" | |
import tweepy | |
from allauth.socialaccount import app_settings | |
def get_api(request): | |
""" This was borrowed from | |
https://github.com/martinjc/DjangoTweepy/blob/master/src/twitter_auth/utils.py | |
:param request: | |
:return: Tweepy API object | |
""" | |
# Get current Twitter app (API keys stored here) | |
app = get_current_app(request, 'twitter') | |
# Create Tweepy OAuth Handler | |
oauth = tweepy.OAuthHandler(app.client_id, app.secret) | |
# Retrieve access token from the current session, created by Allauth | |
access_key = request.session['oauth_%s_access_token' % 'api.twitter.com']['oauth_token'] | |
access_secret = request.session['oauth_%s_access_token' % 'api.twitter.com']['oauth_token_secret'] | |
# Set access token in Tweepy OAuth Handler | |
oauth.set_access_token(access_key, access_secret) | |
# Return Tweepy API object | |
return tweepy.API(oauth) | |
def get_current_app(request, provider): | |
""" This chunk of code was borrowed from | |
django-allauth/allauth/socialaccount/adapter.py | |
:param request: | |
:param provider: | |
:return: app: | |
""" | |
from allauth.socialaccount.models import SocialApp | |
config = app_settings.PROVIDERS.get(provider, {}).get('APP') | |
if config: | |
app = SocialApp(provider=provider) | |
for field in ['client_id', 'secret', 'key']: | |
setattr(app, field, config.get(field)) | |
else: | |
app = SocialApp.objects.get_current(provider, request) | |
return app |
The utility script that I wrote for updating the Django settings.py
file can be found
below:
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# update_settings - Monday, December 23, 2019 | |
__version__ = '1.1.10' | |
import os, re | |
from glob import glob | |
from os.path import basename, dirname, exists, getmtime, join | |
def main(): | |
""" Update settings.py with my preferences """ | |
# Read settings.py | |
contents_modified = False | |
with open(SETTINGS, 'rt') as infile: | |
contents = [x.rstrip() for x in infile.readlines()] | |
# Search & Insert | |
search_inserts = [ | |
( "import os", "from dotenv import load_dotenv" ), | |
( "BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))", | |
"load_dotenv(os.path.join(BASE_DIR, '.env'))" ), | |
( "", "" ), | |
] | |
for search, insert in search_inserts: | |
if not search: | |
continue | |
s = contents.index(search) | |
if s and contents[s + 1] != insert: | |
# print(insert) | |
contents.insert(s + 1, insert) | |
contents_modified = True | |
# Search & Replace | |
search_replaces = [ | |
("SECRET_KEY = ", "SECRET_KEY = os.getenv('SECRET_KEY') or "), | |
("TIME_ZONE = 'UTC'", "TIME_ZONE = os.getenv('TZ') or 'UTC'"), | |
("DEBUG = True", "DEBUG = os.getenv('DEBUG') or False"), | |
("", ""), | |
] | |
for search, replace in search_replaces: | |
if not search: | |
continue | |
for i in range(len(contents)): | |
# ToDo: SECRET_KEY, DEBUG = os.getenv() | |
if contents[i].startswith(search) and replace not in contents[i]: | |
contents[i] = contents[i].replace(search, replace) | |
contents_modified = True | |
# ToDo: TEMPLATES.DIRS = [ os.path.join(BASE_DIR, 'templates'), ] | |
# ToDo: TIME_ZONE = 'UTC' ==> TIME_ZONE = os.getenv('TZ') or 'America/Toronto' | |
# ToDo: LOGIN URLs, etc. | |
# Section Processing - TEMPLATES | |
section = 'TEMPLATES = ' | |
inside_templates = False | |
for i in range(len(contents)): | |
if contents[i].startswith(section): | |
# print('Found TEMPLATES') | |
inside_templates = True | |
if inside_templates: | |
search = "'DIRS': []," | |
replace = "'DIRS': [ os.path.join(BASE_DIR, 'templates'), ]," | |
if contents[i].strip() == search and replace not in contents[i]: | |
contents[i] = contents[i].replace(search, replace) | |
contents_modified = True | |
if contents[i] == ']': | |
inside_templates = False | |
# Process .env file | |
with open('.env', 'rt') as envfile: | |
# Only load non-blank lines that aren't comments and strip newline characters | |
envlines = [x.rstrip() for x in envfile.readlines() if x != '\n' and not x.startswith('#')] | |
# print(envlines) | |
varnames = [] | |
for line in envlines: | |
varnames.append(line.split('=')[0]) | |
# print(varnames) | |
vars_regex = re.compile(r'(%s)\s*=' % '|'.join(varnames)) | |
# print("RegEx:", vars_regex) | |
found = [] | |
for i in range(len(contents)): | |
line = contents[i] | |
match = vars_regex.search(line) | |
if match: | |
word = match[0].split('=')[0].strip() | |
found.append(word) | |
not_found = [] | |
for v in varnames: | |
if v not in found: | |
not_found.append(v) | |
if not_found: | |
addlines = [x + " = os.getenv('%s')" % x for x in not_found] | |
addlines.insert(0, "\n\n# Additional environment variables in .env added by 'update_settings'") | |
contents += addlines + ['\n\n\n',] | |
contents_modified = True | |
# Process any changes | |
if contents_modified: | |
# Remove duplicate lines from file | |
newlines = [] | |
lastline = '' | |
for c in contents: | |
if c != lastline: | |
newlines.append(c) | |
if c.strip() != '': | |
lastline = c | |
contents = newlines | |
# Write updated file to disk | |
with open(WORKFILE, 'wt') as outfile: | |
outfile.writelines([ x + os.linesep for x in contents ]) | |
os.renames(SETTINGS, SETTINGS + '~') | |
os.renames(WORKFILE, SETTINGS) | |
else: | |
print("No modifications detected.") | |
return | |
def init(): | |
""" Prepare for main() processing """ | |
global SETTINGS, WORKFILE | |
settings_py = glob('**/%s' % SETTINGS, recursive=True) | |
if not settings_py: | |
raise FileNotFoundError(SETTINGS) | |
SETTINGS = settings_py[0] | |
parts = SETTINGS.split(os.sep) | |
parts[-1] = '.' + parts[-1] | |
if not exists(SETTINGS): | |
raise FileNotFoundError(SETTINGS) | |
WORKFILE = os.sep.join(parts) | |
return | |
def eoj(): | |
if exists(BACKUP_FILE): | |
os.system('meld %s %s' % (BACKUP_FILE, SETTINGS)) | |
return | |
def debug_breakpoint(): | |
pass | |
return | |
if __name__ == '__main__': | |
SETTINGS = 'settings.py' | |
WORKFILE = '.' + SETTINGS | |
BACKUP_FILE = SETTINGS + '~' | |
init() | |
main() | |
eoj() |
meld
installed, you might want to comment-out the line in eoj()
that called it, or comment-out the call to eoj()
, itself.)