import asyncio import html import json import random from time import time from typing import Annotated, Any, Optional from zoneinfo import ZoneInfo import discord from discord import app_commands from discord.ext import commands from discord.ext.commands import Context from discord.utils import get from discord.utils import * from helpers import checks, db_manager # This is the Weather module class Weather(commands.Cog, name="weather"): def __init__(self, bot): = bot with open("data/weather.json") as f: self.weather_constants = json.load(f) async def weather(self, ctx: commands.Context): """Show current weather in given location""" if ctx.invoked_subcommand is None: await util.command_group_help(ctx) else: ctx.location = await "SELECT location_string FROM user_settings WHERE user_id = %s",, ) @weather.command(name="now") async def weather_now(self, ctx: commands.Context, *, location: Optional[str] = None): if location is None: location = ctx.location if ctx.location is None: raise exceptions.CommandInfo( f"Please save your location using `{ctx.prefix}weather save `" ) lat, lon, address = await self.geolocate(location) local_time, country_code = await self.get_country_information(lat, lon) API_BASE_URL = "" params = { "apikey":, "location": f"{lat},{lon}", "fields": ",".join( [ "precipitationProbability", "precipitationType", "windSpeed", "windGust", "windDirection", "temperature", "temperatureApparent", "cloudCover", "weatherCode", "humidity", "temperatureMin", "temperatureMax", "sunriseTime", "sunsetTime", ] ), "units": "metric", "timesteps": ",".join(["current", "1d"]), "endTime": arrow.utcnow().shift(days=+1, minutes=+5).isoformat(), } if isinstance(local_time.tzinfo, ZoneInfo): params["timezone"] = str(local_time.tzinfo) else: logger.warning("Arrow object must be constructed with ZoneInfo timezone object") async with, params=params) as response: if response.status != 200: logger.error(response.status) logger.error(response.headers) logger.error(await response.text()) raise exceptions.CommandError(f"Weather api returned HTTP ERROR {response.status}") data = await response.json(loads=orjson.loads) current_data = next( filter(lambda t: t["timestep"] == "current", data["data"]["timelines"]) ) daily_data = next(filter(lambda t: t["timestep"] == "1d", data["data"]["timelines"])) values_current = current_data["intervals"][0]["values"] values_today = daily_data["intervals"][0]["values"] # values_tomorrow = daily_data["intervals"][1]["values"] temperature = values_current["temperature"] temperature_apparent = values_current["temperatureApparent"] sunrise = arrow.get(values_current["sunriseTime"]).to(local_time.tzinfo).format("HH:mm") sunset = arrow.get(values_current["sunsetTime"]).to(local_time.tzinfo).format("HH:mm") icon = self.weather_constants["id_to_icon"][str(values_current["weatherCode"])] summary = self.weather_constants["id_to_description"][str(values_current["weatherCode"])] if ( values_today["precipitationType"] != 0 and values_today["precipitationProbability"] != 0 ): precipitation_type = self.weather_constants["precipitation"][ str(values_today["precipitationType"]) ] summary += f", with {values_today['precipitationProbability']}% chance of {precipitation_type}" content = discord.Embed(color=int("e1e8ed", 16)) content.title = f":flag_{country_code.lower()}: {address}" content.set_footer(text=f"🕐 Local time {local_time.format('HH:mm')}") def render(F: bool): information_rows = [ f":thermometer: Currently **{temp(temperature, F)}**, feels like **{temp(temperature_apparent, F)}**", f":calendar: Daily low **{temp(values_today['temperatureMin'], F)}**, high **{temp(values_today['temperatureMax'], F)}**", f":dash: Wind speed **{values_current['windSpeed']} m/s** with gusts of **{values_current['windGust']} m/s**", f":sunrise: Sunrise at **{sunrise}**, sunset at **{sunset}**", f":sweat_drops: Air humidity **{values_current['humidity']}%**", f":map: [See on map]({lat},{lon})", ] content.clear_fields().add_field( name=f"{icon} {summary}", value="\n".join(information_rows), ) return content await WeatherUnitToggler(render, False).run(ctx) @weather.command(name="forecast") async def weather_forecast(self, ctx: commands.Context, *, location: Optional[str] = None): if location is None: location = ctx.location if ctx.location is None: raise exceptions.CommandInfo( f"Please save your location using `{ctx.prefix}weather save `" ) lat, lon, address = await self.geolocate(location) local_time, country_code = await self.get_country_information(lat, lon) API_BASE_URL = "" params = { "apikey":, "location": f"{lat},{lon}", "fields": ",".join( [ "precipitationProbability", "precipitationType", "windSpeed", "windGust", "windDirection", "temperature", "temperatureApparent", "cloudCover", "weatherCode", "humidity", "temperatureMin", "temperatureMax", ] ), "units": "metric", "timesteps": "1d", "endTime": arrow.utcnow().shift(days=+7).isoformat(), } if isinstance(local_time.tzinfo, ZoneInfo): params["timezone"] = str(local_time.tzinfo) else: logger.warning("Arrow object must be constructed with ZoneInfo timezone object") async with, params=params) as response: if response.status != 200: logger.error(response.status) logger.error(response.headers) logger.error(await response.text()) raise exceptions.CommandError(f"Weather api returned HTTP ERROR {response.status}") data = await response.json(loads=orjson.loads) content = discord.Embed( title=f":flag_{country_code.lower()}: {address}", color=int("ffcc4d", 16), ) def render(F: bool): days = [] for day in data["data"]["timelines"][0]["intervals"]: date = arrow.get(day["startTime"]).format("**`ddd`** `D/M`") values = day["values"] minTemp = values["temperatureMin"] maxTemp = values["temperatureMax"] icon = self.weather_constants["id_to_icon"][str(values["weatherCode"])] description = self.weather_constants["id_to_description"][ str(values["weatherCode"]) ] days.append( f"{date} {icon} **{temp(maxTemp, F)}** / **{temp(minTemp, F)}** — {description}" ) content.description = "\n".join(days) return content await WeatherUnitToggler(render, False).run(ctx) @weather.command(name="save") async def weather_save(self, ctx: commands.Context, *, location: str): await """ INSERT INTO user_settings (user_id, location_string) VALUES (%s, %s) ON DUPLICATE KEY UPDATE location_string = VALUES(location_string) """,, location, ) return await util.send_success(ctx, f"Saved your location as `{location}`") async def geolocate(self, location): GOOGLE_GEOCODING_API_URL = "" params = {"address": location, "key":} async with, params=params) as response: geocode_data = await response.json(loads=orjson.loads) try: geocode_data = geocode_data["results"][0] except IndexError: raise exceptions.CommandWarning("Could not find that location!") formatted_name = geocode_data["formatted_address"] lat = geocode_data["geometry"]["location"]["lat"] lon = geocode_data["geometry"]["location"]["lng"] return lat, lon, formatted_name async def get_country_information(self, lat, lon): TIMEZONE_API_URL = "" params = { "key":, "format": "json", "by": "position", "lat": lat, "lng": lon, } async with, params=params) as response: data = await response.json(loads=orjson.loads) country_code = data["countryCode"] try: local_time =["zoneName"])) except ValueError: # does not have a time zone # most likely a special place such as antarctica # use UTC local_time = arrow.utcnow() return local_time, country_code async def setup(bot): await bot.add_cog(Weather(bot))