Files
api.digisnaxx/events/digitools.py
2026-01-09 22:44:40 -05:00

368 lines
12 KiB
Python

import os, sys
from datetime import datetime
from dateutil import relativedelta
from time import sleep
import pytz
from lxml import html
from pprint import pprint as ppr
import django
from xvfbwrapper import Xvfb
from selenium import webdriver as wd
from events.models import Event as DSEvent, Organization, Promo, Scraper, Calendar
# tz = pytz.timezone("US/Central")
plus_one_month = relativedelta.relativedelta(months=1)
odt_next_month = datetime.now() + plus_one_month
# Get Scraper name, item count and online_calendar (virtcal)
def getScraper(venue, webite, cal):
virtcal = Calendar.objects.get(shortcode='000')
try:
scraper, created = Scraper.objects.get_or_create(
name=venue.name,
website=website,
calendar = Calendar.objects.get(shortcode=cal),
items = 0,
new_items = 0,
last_ran = datetime.now(),
)
except Exception as e:
print(e)
scraper = Scraper.objects.get(name=venue.name)
num_of_events = DSEvent.objects.filter(scraper=scraper)
scraper.items = len(num_of_events)
scraper.save()
print("Scraper: ", scraper)
return scraper, scraper.items, virtcal
# Update item_count of the Scraper at the end of the scrape
def updateScraper(scraper, item_count_start):
num_of_events = DSEvent.objects.filter(scraper=scraper)
scraper.items = len(num_of_events)
scraper.new_items = len(num_of_events) - item_count_start
scraper.last_ran = datetime.now()
scraper.save()
return
# Get site HTML content for XPATH travel
def getSource(browser, link):
browser.get(link)
sleep(5)
ps = html.fromstring(browser.page_source)
return ps
# Get Selenium Web Drive, with params for Chrome or Firefox
# Or in production to run headless
def getBrowser(run_env):
if run_env == 'dev':
print("Chrome is a go!")
br = wd.Chrome()
return br
elif run_env == "def":
print("Firefox go vroom")
br = wd.Firefox()
return br
elif run_env == "prod":
start_cmd = "Xvfb :91 && export DISPLAY=:91 &"
xvfb = Xvfb()
os.system(start_cmd)
xvfb.start()
print("started Xvfb")
br = wd.Firefox()
return br
else:
print("Failed", sys.argv, arg1)
quit()
# Create Dated URL with zero-padded numbers
def createBasicURL(site_url):
month = datetime.now().month
next_month = odt_next_month.month
year = datetime.now().year
if next_month == 1:
next_year = year+1
links = [
site_url + str(month) + "/" + str(year),
site_url + str(next_month) + "/" + str(next_year)
]
else:
links = [
site_url + str(month) + "/" + str(year),
site_url + str(next_month) + "/" + str(year)
]
return links
# Create Dated URL without zero-padded numbers
def createURLNoZero(site_url):
month = datetime.now().month
next_month = odt_next_month.month
year = datetime.now().year
links = [
site_url + str(year) + "/" + str(month),
]
if next_month == "1":
links.append(site_url + str(int(year)+1) + "/" + str(next_month))
else:
links.append(site_url + str(year) + "/" + str(next_month))
return links
# Create Dated URL Link with zero-padding
def createURL(site_url):
month = datetime.now().month
if month < 10:
month = "0" + str(month)
else:
month = str(month)
next_month = odt_next_month.month
if next_month < 10:
next_month = "0" + str(next_month)
else:
next_month = str(next_month)
year = datetime.now().year
links = [
site_url + str(year) + "/" + month,
]
if next_month == "01":
links.append(site_url + str(int(year)+1) + "/" + next_month)
else:
links.append(site_url + str(year) + "/" + next_month)
return links
# Create Dated URL with dashes
def createDashURL(site_url):
month = datetime.now().month
if month < 10:
month = "0" + str(month)
else:
month = str(month)
next_month = odt.month
if next_month < 10:
next_month = "0" + str(next_month)
else:
next_month = str(next_month)
year = datetime.now().year
links = [
site_url + month + "-" + str(year),
site_url + next_month + "-" + str(year)
]
print(links)
return links
# Add Calendar to Event Object (maybe extraneous)
def add_calendar(event, calendar):
if type(event) is tuple:
event = event[0]
cal = Calendar.objects.get(shortcode=calendar)
event.calendar.add(cal)
event.save()
return event
# Add Calendars to Event Object ??
def add_calendars(event, data):
if type(data['calendars']) is not list:
event.calendar.add(data['calendars'])
else:
for cal in data['calendars']:
event.calendar.add(cal)
event.save()
return event
# Create Basic DigiSnaxx Event
def createBasicEvent(event, event_type, venue):
new_event, created = DSEvent.objects.update_or_create(
event_type = event_type,
show_title = event['title'],
show_link = event['link'],
show_date = event['dateStamp'],
show_day = event['dateStamp'],
scraper = event['scraper'],
venue = venue
)
new_event = add_calendars(new_event, event)
print("\n+new event+\n")
return new_event, created
# Create iCal Event
def createBasiciCalEvent(event, event_type, venue):
new_event, created = DSEvent.objects.update_or_create(
event_type = event_type,
show_title = event['title'][0],
show_link = event['link'],
show_date = datetime.strptime(str(event['dateStamp'][0]), '%Y-%m-%d %H:%M:%S'),
show_day = datetime.strptime(str(event['dateStamp'][0]), '%Y-%m-%d %H:%M:%S'),
scraper = event['scraper'],
venue = venue
)
new_event = add_calendars(new_event, event)
print("\n+new event+\n")
return new_event, created
# Create Detailed Event with Details & Guests
# Details in JSON Format
def createDetailedEvent(event, event_type, venue, scraper):
new_event, created = DSEvent.objects.update_or_create(
event_type = event_type,
show_title = event["show_title"],
show_link = event["link"],
show_date = event["dateStamp"],
show_day = event['dateStamp'],
guests = " ".join(event["guests"]),
more_details = event["details"],
scraper = event['scraper'],
venue = venue
)
new_event = add_calendars(new_event, event)
print("\n+new event+\n")
return new_event, created
# Create iCal event from DF_Online & Medellin
def createCleanIcalEvent(event, scraper, venue, event_type):
new_date = event['dateStart']
new_event = {}
new_event['scraper'] = scraper
new_event['calendars'] = scraper.calendar
new_event['title'] = event['strSummary'],
new_event['date'] = str(new_date),
new_event['dateStamp'] = str(new_date),
new_event['link'] = venue.website
createBasiciCalEvent(new_event, event_type, venue)
# Get events from iCal
def getiCalEvents(gcal, scraper, venue, event_type):
events = []
for component in gcal.walk():
event = {}
event['scraper'] = scraper
event['calendars'] = [scraper.calendar]
event['strSummary'] = f"{(component.get('SUMMARY'))}"
event['strDesc'] = component.get('DESCRIPTION')
event['strLocation'] = component.get('LOCATION')
event['dateStart'] = component.get('DTSTART')
event['dateStamp'] = component.get('DTSTAMP')
if event['dateStamp'] is not None:
event['dateStamp'] = event['dateStamp'].dt
if event['dateStart'] is not None:
try:
event['dateStart'] = event['dateStart'].dt
except Exception as e:
print("what? ", e)
if event['strSummary'] != 'None':
event['details'] = {
"description" : event['strDesc'],
"Location" : event['strLocation'],
}
events.append(event)
return events
# Build iCal Events and Send to Create
def buildiCalEvents(events, event_type, scraper):
for event in events:
e = {}
e['calendars'] = event['calendars']
try:
e['dateStamp'] = event['dateStart'][0]
except:
e['dateStamp'] = event['dateStart']
e['title'] = event['strSummary']
e['scraper'] = scraper
e['link'] = venue.website
try:
createBasicEvent(e, event_type, venue)
scraper.items+=1
except Exception as e:
print("Error: ", e)
scraper.save()
return
def getMDEVenue(venue):
if venue.name == "DANCEFREE":
venue.website = "https://www.instagram.com/dancefreeco"
if venue.name == "Vintrash":
venue.website = "https://www.instagram.com/vintrashbar"
if venue.name == "The Wandering Paisa":
venue.website = "https://wanderingpaisahostel.com"
if venue.name == "Dulce Posion":
venue.website = "https://www.instagram.com/dulceposionr"
if venue.name == "Blood Dance Company":
venue.website = "https://www.instagram.com/blooddancecompany"
if venue.name == "OLSA Certified Spanish School":
venue.website = "https://www.olsafoundation.org/"
if event['strSummary'] == "Merli Rooftop Language Exchange":
venue.website = "https://calendar.google.com/calendar/embed?src=46ae0446724b1b3ee83cbd7dbc0db6a235bf97509ad860ca91eada3c267b5e41%40group.calendar.google.com&ctz=America%2FBogota"
if "Concious Warrior" in event['strSummary']:
venue.website = "https://www.consciouscolombia.com/"
venue.save()
return
# Get iCal events for Medellin & OnlineEvents
def getiCalRepeateEvents(gcal, scraper, venue, event_type, cal):
for component in gcal.walk():
event = {}
event['scraper'] = scraper
event['calendars'] = [scraper.calendar]
event['strSummary'] = f"{(component.get('SUMMARY'))}"
event['strDesc'] = component.get('DESCRIPTION')
event['strLocation'] = str(component.get('LOCATION'))
event['dateStart'] = component.get('DTSTART')
event['dateStamp'] = component.get('DTSTAMP')
if event['strSummary'] != 'None':
event['details'] = {
"description" : event['strDesc'],
"Location" : event['strLocation'],
}
if event['dateStamp'] != None:
event['dateStart'] = event['dateStart'].dt
event['dateStart'] = datetime.strptime(str(event['dateStart'])[:-6], '%Y-%m-%d %H:%M:%S')
rules = component.get('RRule')
try:
if rules['FREQ'][0] == 'WEEKLY':
if datetime.today().weekday() != 0:
event = digitools.splitLocation(event, "Medellin")
date = datetime.today().date() - timedelta(days=datetime.today().weekday())
date = datetime.combine(date, event['dateStart'].time())
days = ["SU", "MO", "TU", "WE", "TH", "FR", "SA"]
for day in rules['BYDAY']:
day = days.index(day)
if cal == 'mde':
getVenue(event['venue'])
iCalEventRepeatFilter(day, date, event, scraper, event['venue'], "Ed")
except Exception as e:
print("Error: ", e, "\n\n\n\n")
pass
def iCalEventRepeatFilterteEvent(day, date, event, scraper, venue, event_type):
days = [day-1, day+6, day+13]
for day in days:
event['dateStamp'] = date + timedelta(days=day)
event['dateStart'] = event['dateStamp']
digitools.createCleanIcalEvent(event, scraper, venue, event_type)
return
def splitLocation(event, **kwargs):
loc_split = event['strLocation'].split(',')
venue_name = loc_split[0]
venue, created = Organization.objects.get_or_create(
name=venue_name,
)
event['venue'] = venue
if city:
venue.city = kwargs['city']
venue.save()
return event
# ARCHIVED Methods
def createBasicArticle(article, event_type, organization):
new_article, created = Promo.objects.update_or_create(
promo_type = 'Ja',
title = article['title'],
target_link = article['link'],
published = True,
organization = organization
)
return new_article, created