Use rxjs observables for blockchain data

This commit is contained in:
David Keathley 2023-04-21 15:51:09 -07:00
parent 4f5812c2b8
commit deee0f205a
7 changed files with 158 additions and 3510 deletions

3491
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -13,12 +13,11 @@
"dependencies": {
"discord.js": "^14.9.0",
"dotenv": "^16.0.3",
"ethers": "^6.3.0",
"redis": "^4.6.5",
"web3": "^1.9.0"
"rxjs": "^7.8.0"
},
"devDependencies": {
"@types/dotenv": "^8.2.0",
"@types/redis": "^4.0.11",
"typescript": "^4.9.5"
}
}

View File

@ -1,61 +1,45 @@
import 'dotenv/config.js';
import Web3 from 'web3';
import { WebSocketProvider, formatUnits } from 'ethers';
import { Observable } from 'rxjs';
import { map, scan } from 'rxjs/operators';
import { GasPrices } from '../types/gasPrices'
import redisClient from './redis';
import { GasPrices } from '../types/gasPrices';
const rpcUrl = process.env.RPC_URL || 'ws://localhost:8545';
const rpcUrl = process.env.RPC_URL || "wss://ropsten.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID";
// Create a new web3 instance
const web3 = new Web3(new Web3.providers.WebsocketProvider(rpcUrl, {
reconnect: {
auto: true,
delay: 5000,
maxAttempts: 5,
onTimeout: false
const provider = new WebSocketProvider(rpcUrl);
const blockGasPricesObservable = new Observable<GasPrices>((observer) => {
provider.on('block', async (blockNumber) => {
try {
const block = await provider.getBlock(blockNumber, true);
if (!block) throw new Error(`Error fetching block! ${blockNumber}`);
const gasPrices = block.prefetchedTransactions.map((tx) => tx.gasPrice);
const fast = Number(formatUnits(gasPrices[Math.floor(gasPrices.length * 0.9)], "gwei"));
const average = Number(formatUnits(gasPrices[Math.floor(gasPrices.length / 2)], "gwei"));
const slow = Number(formatUnits(gasPrices[Math.floor(gasPrices.length * 0.05)], "gwei"));
observer.next({ fast, average, slow } as GasPrices);
} catch (error) {
observer.error(`Error fetching block! ${error}`);
}
}));
export const subToBlockHeaders = (setDiscordStatus: () => Promise<void>) => {
web3.eth.subscribe('newBlockHeaders', (error, blockHeader) => {
if (error) console.error(error);
const shouldLogWei = blockHeader.number % 10 === 0;
// Get the gas price for this block
web3.eth.getGasPrice((error, gasPrice) => {
if (error) console.error(error);
if (shouldLogWei) console.log('Gas price in wei:', gasPrice);
redisClient.set('gas-price', Math.round(Number(gasPrice)))
});
});
// Set status every block
setDiscordStatus()
});
}
const gweiFromWei = (priceInWei: number): number =>
Math.round(Number(web3.utils.fromWei(`${Math.round(priceInWei)}`, 'gwei')));
const getGasPrice = async (): Promise<number> => {
const gasPrice = await redisClient.get('gas-price');
return Number(gasPrice);
}
const getGasPricesInGwei = async (): Promise<GasPrices> => {
const gasPrice = await getGasPrice()
const fastPrice = gasPrice * 1.1;
const slowPrice = gasPrice * 0.9;
const gasPrices = {
fast: gweiFromWei(fastPrice),
average: gweiFromWei(gasPrice),
slow: gweiFromWei(slowPrice),
const averageGasPricesObservable = blockGasPricesObservable.pipe(
scan((acc, curr) => [...acc.slice(-19), curr], [] as GasPrices[]),
map((blocks) => {
const fastSum = blocks.reduce((sum, block) => sum + block.fast, 0);
const averageSum = blocks.reduce((sum, block) => sum + block.average, 0);
const slowSum = blocks.reduce((sum, block) => sum + block.slow, 0);
return {
fast: fastSum / blocks.length,
average: averageSum / blocks.length,
slow: slowSum / blocks.length,
};
})
);
return gasPrices;
};
export { getGasPricesInGwei };
export { averageGasPricesObservable };

View File

@ -5,6 +5,6 @@ module.exports = {
name: Events.ClientReady,
once: true,
execute(client: DiscordClient) {
if (client.user) return console.log(`Ready! Logged in as ${client.user.id}: ${client.user.tag}`);
if (client.user) return console.log(`Ready! Logged in as ${client.user.tag}`);
}
};

View File

@ -1,22 +1,20 @@
import { EmbedBuilder, TextChannel } from 'discord.js';
import { getGasPricesInGwei } from './blockchain';
import { DiscordClient } from './discordClient';
import redisClient from './redis';
import { GasAlert } from '../types/gasAlert';
import { GasPrices } from '../types/gasPrices';
const createGasPriceChecker = (client: DiscordClient) => {
setInterval(async () => {
const createGasAlertChecker = (client: DiscordClient) =>
async (gasPrices: GasPrices) => {
try {
const gasPrices = await getGasPricesInGwei();
const gasAlerts: GasAlert[] = await redisClient
.hVals('gas-alerts')
.then((values) => values.map(val => JSON.parse(val)));
gasAlerts.forEach(async (gasAlert) => {
if (gasPrices.average <= gasAlert.threshold) {
if (gasPrices.fast <= gasAlert.threshold) {
const channel = await client.channels.fetch(gasAlert.channelId) as TextChannel;
const user = await client.users.fetch(gasAlert.userId);
@ -39,7 +37,6 @@ Gas prices have fallen below your alert threshold of ${gasAlert.threshold} Gwei:
} catch (error) {
console.log(`Error checking gas prices:\n`, error);
}
}, 15000);
};
export { createGasPriceChecker };
export { createGasAlertChecker };

View File

@ -1,12 +1,15 @@
import { ChatInputCommandInteraction, EmbedBuilder } from 'discord.js';
import redisClient from './redis';
import { getGasPricesInGwei } from './blockchain';
import { GasAlert } from '../types/gasAlert';
import { GasPrices } from '../types/gasPrices';
// Respond to the "/gas" command
const handleGasCommand = async (interaction: ChatInputCommandInteraction): Promise<void> => {
const gasPrices = await getGasPricesInGwei();
const gasPricesStr = await redisClient.get('current-prices');
if (gasPricesStr) {
const gasPrices = JSON.parse(gasPricesStr) as GasPrices;
console.log(`Replying to command "/gas": \n`, gasPrices);
@ -16,6 +19,10 @@ const handleGasCommand = async (interaction: ChatInputCommandInteraction): Promi
.setDescription(`${gasPrices.fast} ⦚⦚ 🚶 ${gasPrices.average} ⦚⦚ 🐢 ${gasPrices.slow}`)
await interaction.reply({ embeds: [embed] });
} else {
console.log(`Error fetching gas prices!`);
await interaction.reply('ERROR FETCHING GAS!!!1!')
}
};
// Respond to the "/gas alert ${gwei}" command

View File

@ -9,19 +9,30 @@ import fs from 'node:fs';
import path from 'node:path';
import { ActivityType, GatewayIntentBits } from 'discord.js';
import { getGasPricesInGwei, subToBlockHeaders } from './blockchain';
import { averageGasPricesObservable } from './blockchain';
import { deployCommands } from './deploy';
import { DiscordClient } from './discordClient';
import { createGasPriceChecker } from './gasPriceChecker';
import { createGasAlertChecker } from './gasAlertChecker';
import redisClient from './redis';
import { GasAlert } from '../types/gasAlert';
import { GasPrices } from '../types/gasPrices';
const token = process.env.DISCORD_BOT_TOKEN || "";
// Create a new Discord client
const client = new DiscordClient({ intents: [GatewayIntentBits.Guilds, GatewayIntentBits.GuildMessages] });
const doAlerts = createGasAlertChecker(client);
const setDiscordStatus = async ({ average, fast, slow }: GasPrices) => {
if (client.user) {
client.user.setActivity(
`${fast} ⦚ 🚶${average} ⦚ 🐢${slow}`
, { type: ActivityType.Watching });
}
}
// Load bot commands
const commandsPath = path.join(__dirname, 'commands');
const commandFiles = fs.readdirSync(commandsPath).filter(file => file.endsWith('.js'));
@ -36,6 +47,7 @@ for (const file of commandFiles) {
}
}
// Load bot events
const eventsPath = path.join(__dirname, 'events');
const eventFiles = fs.readdirSync(eventsPath).filter(file => file.endsWith('.js'));
@ -63,18 +75,14 @@ client.login(token)
.catch((reason) => console.log("Error connecting to redis!\n", reason))
})
.then(async () => {
const setDiscordStatus = async () => {
if (client.user) {
const { average, fast, slow } = await getGasPricesInGwei();
client.user.setActivity(
`${fast} ⦚ 🚶${average} ⦚ 🐢${slow}`
, { type: ActivityType.Watching });
}
}
// Start listening to blockchain
subToBlockHeaders(setDiscordStatus);
averageGasPricesObservable.subscribe({
next: async (averageGasPrices) => {
await redisClient.set('current-prices', JSON.stringify(averageGasPrices))
await setDiscordStatus(averageGasPrices);
await doAlerts(averageGasPrices);
},
error: console.error,
complete: () => console.log("Blockchain data stream closed")
})
.then(() => {
// Start the gas price checker
createGasPriceChecker(client);
});