264 lines
10 KiB
Python
264 lines
10 KiB
Python
|
import asyncio
|
||
|
import html
|
||
|
import json
|
||
|
import random
|
||
|
from time import time
|
||
|
from typing import Annotated, Any, Optional
|
||
|
from zoneinfo import ZoneInfo
|
||
|
|
||
|
import discord
|
||
|
from discord import app_commands
|
||
|
from discord.ext import commands
|
||
|
from discord.ext.commands import Context
|
||
|
from discord.utils import get
|
||
|
|
||
|
from discord.utils import *
|
||
|
|
||
|
from helpers import checks, db_manager
|
||
|
|
||
|
# This is the Weather module
|
||
|
class Weather(commands.Cog, name="weather"):
|
||
|
|
||
|
def __init__(self, bot):
|
||
|
self.bot = bot
|
||
|
with open("data/weather.json") as f:
|
||
|
self.weather_constants = json.load(f)
|
||
|
|
||
|
|
||
|
@commands.group()
|
||
|
async def weather(self, ctx: commands.Context):
|
||
|
"""Show current weather in given location"""
|
||
|
if ctx.invoked_subcommand is None:
|
||
|
await util.command_group_help(ctx)
|
||
|
else:
|
||
|
ctx.location = await self.bot.db.fetch_value(
|
||
|
"SELECT location_string FROM user_settings WHERE user_id = %s",
|
||
|
ctx.author.id,
|
||
|
)
|
||
|
|
||
|
@weather.command(name="now")
|
||
|
async def weather_now(self, ctx: commands.Context, *, location: Optional[str] = None):
|
||
|
if location is None:
|
||
|
location = ctx.location
|
||
|
if ctx.location is None:
|
||
|
raise exceptions.CommandInfo(
|
||
|
f"Please save your location using `{ctx.prefix}weather save <location...>`"
|
||
|
)
|
||
|
|
||
|
lat, lon, address = await self.geolocate(location)
|
||
|
local_time, country_code = await self.get_country_information(lat, lon)
|
||
|
|
||
|
API_BASE_URL = "https://api.tomorrow.io/v4/timelines"
|
||
|
params = {
|
||
|
"apikey": self.bot.keychain.TOMORROWIO_TOKEN,
|
||
|
"location": f"{lat},{lon}",
|
||
|
"fields": ",".join(
|
||
|
[
|
||
|
"precipitationProbability",
|
||
|
"precipitationType",
|
||
|
"windSpeed",
|
||
|
"windGust",
|
||
|
"windDirection",
|
||
|
"temperature",
|
||
|
"temperatureApparent",
|
||
|
"cloudCover",
|
||
|
"weatherCode",
|
||
|
"humidity",
|
||
|
"temperatureMin",
|
||
|
"temperatureMax",
|
||
|
"sunriseTime",
|
||
|
"sunsetTime",
|
||
|
]
|
||
|
),
|
||
|
"units": "metric",
|
||
|
"timesteps": ",".join(["current", "1d"]),
|
||
|
"endTime": arrow.utcnow().shift(days=+1, minutes=+5).isoformat(),
|
||
|
}
|
||
|
|
||
|
if isinstance(local_time.tzinfo, ZoneInfo):
|
||
|
params["timezone"] = str(local_time.tzinfo)
|
||
|
else:
|
||
|
logger.warning("Arrow object must be constructed with ZoneInfo timezone object")
|
||
|
|
||
|
async with self.bot.session.get(API_BASE_URL, params=params) as response:
|
||
|
if response.status != 200:
|
||
|
logger.error(response.status)
|
||
|
logger.error(response.headers)
|
||
|
logger.error(await response.text())
|
||
|
raise exceptions.CommandError(f"Weather api returned HTTP ERROR {response.status}")
|
||
|
data = await response.json(loads=orjson.loads)
|
||
|
|
||
|
current_data = next(
|
||
|
filter(lambda t: t["timestep"] == "current", data["data"]["timelines"])
|
||
|
)
|
||
|
daily_data = next(filter(lambda t: t["timestep"] == "1d", data["data"]["timelines"]))
|
||
|
values_current = current_data["intervals"][0]["values"]
|
||
|
values_today = daily_data["intervals"][0]["values"]
|
||
|
# values_tomorrow = daily_data["intervals"][1]["values"]
|
||
|
temperature = values_current["temperature"]
|
||
|
temperature_apparent = values_current["temperatureApparent"]
|
||
|
sunrise = arrow.get(values_current["sunriseTime"]).to(local_time.tzinfo).format("HH:mm")
|
||
|
sunset = arrow.get(values_current["sunsetTime"]).to(local_time.tzinfo).format("HH:mm")
|
||
|
|
||
|
icon = self.weather_constants["id_to_icon"][str(values_current["weatherCode"])]
|
||
|
summary = self.weather_constants["id_to_description"][str(values_current["weatherCode"])]
|
||
|
|
||
|
if (
|
||
|
values_today["precipitationType"] != 0
|
||
|
and values_today["precipitationProbability"] != 0
|
||
|
):
|
||
|
precipitation_type = self.weather_constants["precipitation"][
|
||
|
str(values_today["precipitationType"])
|
||
|
]
|
||
|
summary += f", with {values_today['precipitationProbability']}% chance of {precipitation_type}"
|
||
|
|
||
|
content = discord.Embed(color=int("e1e8ed", 16))
|
||
|
content.title = f":flag_{country_code.lower()}: {address}"
|
||
|
content.set_footer(text=f"🕐 Local time {local_time.format('HH:mm')}")
|
||
|
|
||
|
def render(F: bool):
|
||
|
information_rows = [
|
||
|
f":thermometer: Currently **{temp(temperature, F)}**, feels like **{temp(temperature_apparent, F)}**",
|
||
|
f":calendar: Daily low **{temp(values_today['temperatureMin'], F)}**, high **{temp(values_today['temperatureMax'], F)}**",
|
||
|
f":dash: Wind speed **{values_current['windSpeed']} m/s** with gusts of **{values_current['windGust']} m/s**",
|
||
|
f":sunrise: Sunrise at **{sunrise}**, sunset at **{sunset}**",
|
||
|
f":sweat_drops: Air humidity **{values_current['humidity']}%**",
|
||
|
f":map: [See on map](https://www.google.com/maps/search/?api=1&query={lat},{lon})",
|
||
|
]
|
||
|
|
||
|
content.clear_fields().add_field(
|
||
|
name=f"{icon} {summary}",
|
||
|
value="\n".join(information_rows),
|
||
|
)
|
||
|
|
||
|
return content
|
||
|
|
||
|
await WeatherUnitToggler(render, False).run(ctx)
|
||
|
|
||
|
@weather.command(name="forecast")
|
||
|
async def weather_forecast(self, ctx: commands.Context, *, location: Optional[str] = None):
|
||
|
if location is None:
|
||
|
location = ctx.location
|
||
|
if ctx.location is None:
|
||
|
raise exceptions.CommandInfo(
|
||
|
f"Please save your location using `{ctx.prefix}weather save <location...>`"
|
||
|
)
|
||
|
|
||
|
lat, lon, address = await self.geolocate(location)
|
||
|
local_time, country_code = await self.get_country_information(lat, lon)
|
||
|
API_BASE_URL = "https://api.tomorrow.io/v4/timelines"
|
||
|
params = {
|
||
|
"apikey": self.bot.keychain.TOMORROWIO_TOKEN,
|
||
|
"location": f"{lat},{lon}",
|
||
|
"fields": ",".join(
|
||
|
[
|
||
|
"precipitationProbability",
|
||
|
"precipitationType",
|
||
|
"windSpeed",
|
||
|
"windGust",
|
||
|
"windDirection",
|
||
|
"temperature",
|
||
|
"temperatureApparent",
|
||
|
"cloudCover",
|
||
|
"weatherCode",
|
||
|
"humidity",
|
||
|
"temperatureMin",
|
||
|
"temperatureMax",
|
||
|
]
|
||
|
),
|
||
|
"units": "metric",
|
||
|
"timesteps": "1d",
|
||
|
"endTime": arrow.utcnow().shift(days=+7).isoformat(),
|
||
|
}
|
||
|
|
||
|
if isinstance(local_time.tzinfo, ZoneInfo):
|
||
|
params["timezone"] = str(local_time.tzinfo)
|
||
|
else:
|
||
|
logger.warning("Arrow object must be constructed with ZoneInfo timezone object")
|
||
|
|
||
|
async with self.bot.session.get(API_BASE_URL, params=params) as response:
|
||
|
if response.status != 200:
|
||
|
logger.error(response.status)
|
||
|
logger.error(response.headers)
|
||
|
logger.error(await response.text())
|
||
|
raise exceptions.CommandError(f"Weather api returned HTTP ERROR {response.status}")
|
||
|
data = await response.json(loads=orjson.loads)
|
||
|
|
||
|
content = discord.Embed(
|
||
|
title=f":flag_{country_code.lower()}: {address}",
|
||
|
color=int("ffcc4d", 16),
|
||
|
)
|
||
|
|
||
|
def render(F: bool):
|
||
|
days = []
|
||
|
for day in data["data"]["timelines"][0]["intervals"]:
|
||
|
date = arrow.get(day["startTime"]).format("**`ddd`** `D/M`")
|
||
|
values = day["values"]
|
||
|
minTemp = values["temperatureMin"]
|
||
|
maxTemp = values["temperatureMax"]
|
||
|
icon = self.weather_constants["id_to_icon"][str(values["weatherCode"])]
|
||
|
description = self.weather_constants["id_to_description"][
|
||
|
str(values["weatherCode"])
|
||
|
]
|
||
|
days.append(
|
||
|
f"{date} {icon} **{temp(maxTemp, F)}** / **{temp(minTemp, F)}** — {description}"
|
||
|
)
|
||
|
|
||
|
content.description = "\n".join(days)
|
||
|
return content
|
||
|
|
||
|
await WeatherUnitToggler(render, False).run(ctx)
|
||
|
|
||
|
@weather.command(name="save")
|
||
|
async def weather_save(self, ctx: commands.Context, *, location: str):
|
||
|
await self.bot.db.execute(
|
||
|
"""
|
||
|
INSERT INTO user_settings (user_id, location_string)
|
||
|
VALUES (%s, %s)
|
||
|
ON DUPLICATE KEY UPDATE
|
||
|
location_string = VALUES(location_string)
|
||
|
""",
|
||
|
ctx.author.id,
|
||
|
location,
|
||
|
)
|
||
|
return await util.send_success(ctx, f"Saved your location as `{location}`")
|
||
|
|
||
|
async def geolocate(self, location):
|
||
|
GOOGLE_GEOCODING_API_URL = "https://maps.googleapis.com/maps/api/geocode/json"
|
||
|
params = {"address": location, "key": self.bot.keychain.GCS_DEVELOPER_KEY}
|
||
|
async with self.bot.session.get(GOOGLE_GEOCODING_API_URL, params=params) as response:
|
||
|
geocode_data = await response.json(loads=orjson.loads)
|
||
|
try:
|
||
|
geocode_data = geocode_data["results"][0]
|
||
|
except IndexError:
|
||
|
raise exceptions.CommandWarning("Could not find that location!")
|
||
|
|
||
|
formatted_name = geocode_data["formatted_address"]
|
||
|
lat = geocode_data["geometry"]["location"]["lat"]
|
||
|
lon = geocode_data["geometry"]["location"]["lng"]
|
||
|
|
||
|
return lat, lon, formatted_name
|
||
|
|
||
|
async def get_country_information(self, lat, lon):
|
||
|
TIMEZONE_API_URL = "http://api.timezonedb.com/v2.1/get-time-zone"
|
||
|
params = {
|
||
|
"key": self.bot.keychain.TIMEZONEDB_API_KEY,
|
||
|
"format": "json",
|
||
|
"by": "position",
|
||
|
"lat": lat,
|
||
|
"lng": lon,
|
||
|
}
|
||
|
async with self.bot.session.get(TIMEZONE_API_URL, params=params) as response:
|
||
|
data = await response.json(loads=orjson.loads)
|
||
|
country_code = data["countryCode"]
|
||
|
try:
|
||
|
local_time = arrow.now(ZoneInfo(data["zoneName"]))
|
||
|
except ValueError:
|
||
|
# does not have a time zone
|
||
|
# most likely a special place such as antarctica
|
||
|
# use UTC
|
||
|
local_time = arrow.utcnow()
|
||
|
return local_time, country_code
|
||
|
|
||
|
async def setup(bot):
|
||
|
await bot.add_cog(Weather(bot))
|