# -*- encoding: utf-8 -*- ########################### # # # License # # # ########################### # Do what ever you want with it. # If you like it, it would be nice if # you donate something to # HambacherForst Besetzung # Greenpeace # Robinwood # Junepa # Ende Gelaende # uberspace (thanks for hosting it) # or any other organisation wich deserv it. import re import html import json import os from birdy.twitter import UserClient import datetime import urllib """ Settings """ list_id = "1033423240755113986" CONSUMER_KEY = "" CONSUMER_SECRET = "" ACCESS_TOKEN = "" ACCESS_TOKEN_SECRET = "" # Count of Entrys count = 40 warning_count = 40 #Words to verify the topic buzzwords = ["hambi", "hambibleibt", "hambacherforst", "Wald", "hambacher"] #Accounts wich always tweets about the specific topic whitelist_accounts = ["oaktown", "AktionUnterholz", "HambiBleibt", "MahnwacheHambi", "HambiEA"] #Accounts which do not use timetags and are deactivated to prevent false detection warning_blacklist = ["energieliga", "BonnimWandel"] #Word which marks the tweet as important. words_of_interest = ["achtung", "warnung", "warning", "kontrolle", "wannen", "personalien"] #template for every tweet entry_template = "
_user_ @_screen_name_

_tweet_

[_time_]
" #html path (in most cases /var/www/html) html_path = "html/" """ Variables """ items = [] items_warning = [] picture_files = [] i = 0 last_id = 0 max_loops = 5 client = None """ Functions""" def isTimeTagged(text): """Checks if the text contains a timetag like (0923 or 09:23) and return a bool""" # e.g. 0157 1959 but not 3960 5809 regexp1 = re.compile(r" [0-2][0-9][0-5][0-9] ") # same but with ":" regexp2 = re.compile(r"^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$") return regexp1.search(text) or regexp2.search(text) def resetPictureCache(): """Deletes every file in the pictures folder, that is no longer needed.""" #Create pictures Folder if not os.path.exists(html_path + "pictures"): os.makedirs(html_path + "pictures") else: #Deletes every file which is not in the picture_files list for the_file in os.listdir(html_path + "pictures"): file_path = os.path.join(html_path + "pictures", the_file) if not the_file in picture_files: try: if os.path.isfile(file_path): os.unlink(file_path) except Exception as e: pass def generateHTML(item): """Generates a HTML entry out of the given item.""" item_html = "
" #Here we replace all Placeholders from the template inner_entry = entry_template.replace("_user_", item["display_name"]) inner_entry = inner_entry.replace("_tweet_", item["html_text"]) inner_entry = inner_entry.replace("_link_", item["tweet_url"]) inner_entry = inner_entry.replace("_screen_name_", item["screen_name"]) inner_entry = inner_entry.replace("_class_", item["css_class"]) inner_entry = inner_entry.replace("_time_", item["created_at"]) item_html += inner_entry + "

" return item_html def writeHTMLFile(feed_name = "#HambiBleibt allgemeiner Ticker", file_name = "ticker.html", collection = items): """Generates a HTML page out of the feed_name, file_name and the collection.""" #Static HTML, refreshrate 120s html_text = ""\ +""\ +""\ +"" #Header html_text += "

" \ + feed_name \ + "

" \ + "by @j4nkr |" \ + " CSS by " \ + "@BonnimWandel
" #Add every entry to page for item in collection: html_text = html_text + generateHTML(item) #Add timestamp and statistics html_text = html_text + "
Last Update: " + str(datetime.datetime.now()) + " | "+ str(i) + " Tweets wurden analysiert.
" #writes html file wr = open("html/" + file_name, 'w') wr.write(html_text) def analyzeTweets(tweets): """ Takes every important information out of the tweets """ global i global last_id global items global count global warning_count for tweet in tweets.data: tweet_json = {} #Fill basic data tweet_json["display_name"] = tweet["user"]["name"] tweet_json["screen_name"] = tweet["user"]["screen_name"] tweet_json["tweet_url"] = "https://twitter.com/"+ tweet["user"]["screen_name"]+ "/status/" + tweet["id_str"] tweet_json["plain_text"] = tweet["full_text"] tweet_json["html_text"] = tweet["full_text"] #The time deliverd by Twitter is 2 hrs off, so we correct this by adding 2hrs created_at = datetime.datetime.strptime( tweet["created_at"][:-11], "%a %b %d %H:%M:%S" ) created_at = datetime.timedelta(hours = 2) + created_at created_at = created_at.replace(year = datetime.datetime.now().year) tweet_json["created_at"] = created_at.strftime("%d. %B %H:%M:%S") #Replace URLs with links for url in tweet["entities"]["urls"]: tweet_json["html_text"] = tweet_json["html_text"].replace( url["url"] , "" + url["expanded_url"] + "" ) #Replace Hashtags with links for hashtag in tweet["entities"]["hashtags"]: tweet_json["html_text"] = tweet_json["html_text"].replace( "#" + hashtag["text"], "#" + hashtag["text"] + "" ) #Replace @Mentions with links for user_mentions in tweet["entities"]["user_mentions"]: tweet_json["html_text"] = tweet_json["html_text"].replace( "@" + user_mentions["screen_name"], "@" + user_mentions["screen_name"] + "" ) #Adds media to html_text tweet_media = [] if "media" in tweet["entities"]: tweet_json["html_text"] += "
" for media in tweet["entities"]["media"]: if media["type"] == "photo": #add Media links and ids to data tweet_media.append( [ media["media_url_https"], 'html/pictures/' + media["id_str"] + ".jpg" ] ) urllib.request.urlretrieve( media["media_url_https"], 'html/pictures/' + media["id_str"] + ".jpg" ) #make sure to not delete it from cache picture_files.append(media["id_str"] + ".jpg") tweet_json["html_text"] += \ "" tweet_json["html_text"] += "
" tweet_json["images"] = tweet_media tweet_json["css_class"] = "user" #Check if a account is whitelisted found = False if tweet["user"]["screen_name"] in whitelist_accounts: #check if it contains important words. found_word_of_interest = False for wo in words_of_interest: if wo.lower() in tweet_json["plain_text"].lower(): found_word_of_interest = True #change the css class tweet_json["css_class"] = "whitelist_user_important" #add it to the items_warning collection items_warning.append(tweet_json) warning_count -= - 1 break if not found_word_of_interest: #if the text contains timetags the tweet is also important if isTimeTagged(tweet_json["plain_text"]): tweet_json["css_class"] = "time_tagged" items_warning.append(tweet_json) warning_count = warning_count - 1 else: tweet_json["css_class"] = "whitelist_user" if count > 0 : count = count - 1 items.append(tweet_json) found = True #if the account is not whitelisted if not found: #check for buzzwords for bw in buzzwords: if bw.lower() in tweet_json["plain_text"].lower(): #make sure account is not blacklisted for timetags blacklisted = False for black in warning_blacklist: if black.lower() in tweet["user"]["screen_name"].lower(): blacklisted = True break if not blacklisted: #if not detect them if isTimeTagged(tweet_json["plain_text"]): tweet_json["css_class"] = "time_tagged" items_warning.append(tweet_json) warning_count = warning_count - 1 if count > 0 : items.append(tweet_json) count = count - 1 break i += 1 last_id = tweet["id"] if count == 0 and warning_count == 0: break def gatherItems(): """ Gets the Tweets and analyze them. """ global max_loops tweets = client.api['lists/statuses'].get( list_id = list_id, count = 200, tweet_mode= "extended" ) analyzeTweets(tweets) while (count != 0 or warning_count != 0) and max_loops > 0: tweets = client.api['lists/statuses'].get( list_id = list_id, count = 200, tweet_mode= "extended", max_id = last_id ) analyzeTweets(tweets) max_loops -= 1 def main(): global client #Connect to Twitter client = UserClient( CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET ) #Get and analyze the tweets gatherItems() #Create the normal ticker writeHTMLFile( feed_name="#HambiBleibt allgemeiner Ticker", file_name="ticker.html", collection= items ) #Create the warnings ticker writeHTMLFile( feed_name="#HambiBleibt (Warnungs-)Ticker", file_name="ticker-warning.html", collection= items_warning ) #cleans the cache of pictures. resetPictureCache() if __name__ == "__main__": main()