Twitter Bot loop

This commit is contained in:
Manuel Hude 2021-05-02 21:48:42 +02:00
parent 8fcdc6ae7f
commit 35b2eb8409
9 changed files with 78 additions and 73 deletions

View File

@ -1,3 +1,5 @@
from django.contrib import admin
from app_be.models import Tweet
# Register your models here.
admin.site.register(Tweet)

View File

@ -3,3 +3,10 @@ from django.apps import AppConfig
class AppBeConfig(AppConfig):
name = 'app_be'
def ready(self):
from app_be.views.twitter_bot import twitter_bot
twitter_bot1 = twitter_bot()
twitter_bot1.start()
print("bot started")

View File

@ -20,7 +20,7 @@ class FeedEntry(models.Model):
class Tweet(models.Model):
icon = models.ForeignKey(Icon, on_delete=models.CASCADE)
icon = models.ForeignKey(Icon, on_delete=models.CASCADE,null=True)
text = models.CharField(max_length=137)
date_time = models.DateTimeField()
url = models.CharField(max_length=100)

View File

@ -24,7 +24,7 @@ DEBUG = True
# Application definition
INSTALLED_APPS = [
'app_be',
'app_be.apps.AppBeConfig',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',

View File

@ -18,13 +18,12 @@ from django.contrib import admin
from django.urls import path
from rest_framework.routers import DefaultRouter
from app_be.views.rest_api import *
from app_be.views.twitter import *
urlpatterns = [
path('admin/', admin.site.urls),
url(r'^api/login', LoginClass.login),
url(r'^getSixTweets', TwitterClass.getLastSixTweets),
url(r'^getTweleveTweets',TwitterClass.getLastSixTweets)
url(r'^getTwelveTweets',TwitterClass.getLastSixTweets)
]
router = DefaultRouter()

View File

@ -1,68 +0,0 @@
import twitter
import os
import sched, time, datetime
import asyncio
import sys
from app_be.models import Tweet
from dotenv import load_dotenv
load_dotenv()
api = twitter.Api(consumer_key=os.getenv('TWITTER_API_KEY'),
consumer_secret=os.getenv('TWITTER_API_SECRET'),
access_token_key=os.getenv('TWITTER_ACCESS_TOKEN'),
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'))
class twitter_api:
@staticmethod
def check_credentials():
print(api.VerifyCredentials())
@staticmethod
def get_last_six_tweets():
last_six = Tweet.objects.all().order_by('-id')[:6]
print(last_six)
@staticmethod
def get_last_twelve_tweets():
last_twelve = Tweet.objects.all().order_by('-id')[:12]
print(last_twelve)
@staticmethod
def post_update(text, url):
update = text + ": " + url
api.PostUpdate(update)
class twitter_bot:
def scan_active_feed(self):
starttime = time.time()
while True:
# Search RSS Feeds TODO: for looping over feeds
print("Looking for new results in active feeds")
new_tweet = Tweet()
new_tweet.icon = None
new_tweet.text = "Test 1"
new_tweet.date_time = datetime.datetime.now()
new_tweet.url = "www.google.com"
# if tweet not in db yet
if Tweet.objects.filter(text=new_tweet.text) == []:
# call twitter api to post tweet and store to db
print("Posting update on Twitter")
twitter_api.post_update(new_tweet.text, new_tweet.url)
new_tweet.save()
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
if 'runserver' in sys.argv:
twitter_bot = twitter_bot()
twitter_bot.scan_active_feed()

View File

@ -0,0 +1,33 @@
import twitter
import os
from app_be.models import Tweet
from dotenv import load_dotenv
load_dotenv()
api = twitter.Api(consumer_key=os.getenv('TWITTER_API_KEY'),
consumer_secret=os.getenv('TWITTER_API_SECRET'),
access_token_key=os.getenv('TWITTER_ACCESS_TOKEN'),
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'))
class twitter_api:
@staticmethod
def check_credentials():
print(api.VerifyCredentials())
@staticmethod
def get_last_six_tweets():
last_six = Tweet.objects.all().order_by('-id')[:6]
print(last_six)
@staticmethod
def get_last_twelve_tweets():
last_twelve = Tweet.objects.all().order_by('-id')[:12]
print(last_twelve)
@staticmethod
def post_update(text, url):
update = text + ": " + url
api.PostUpdate(update)

View File

@ -0,0 +1,32 @@
import threading
from datetime import datetime
import time
from app_be.models import Tweet
from app_be.views.twitter_api import twitter_api
class twitter_bot(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
# Search RSS Feeds TODO: for looping over feeds
print("Looking for new results in active feeds")
new_tweet = Tweet()
new_tweet.icon = None
new_tweet.text = "Test 2"
new_tweet.date_time = datetime.now()
new_tweet.url = "www.google.com"
# if tweet not in db yet
if Tweet.objects.filter(text=new_tweet.text) == []:
# call twitter api to post tweet and store to db
print("Posting update on Twitter")
twitter_api.post_update(new_tweet.text, new_tweet.url)
new_tweet.save()
time.sleep(60)

View File

@ -18,7 +18,7 @@ setup(name='waecm-2021-group-04',
'oauthlib==3.1.0',
'py-jwt-validator==0.6.0',
'python-twitter==3.5',
'python-dotenv==0.17.1'
'python-dotenv==0.17.1',
],
license='BSD License', # example license
description='DESCRIPTION')