Incomplete prototype

Things

WIP

Format

Work in progress

Work in progres

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress

Work in progress
This commit is contained in:
Andrea Ciceri 2025-03-25 17:07:20 +01:00
commit 7a1e03ee7a
No known key found for this signature in database
19 changed files with 7242 additions and 0 deletions

51
offchain/src/config.rs Normal file
View file

@ -0,0 +1,51 @@
use kdl::KdlDocument;
use miette::{miette, Result};
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct Config {
pub endpoint: String,
pub pairs_file: PathBuf,
pub concurrency: usize,
}
impl Config {
pub fn parse_config(config_file: PathBuf) -> Result<Config> {
let config_str = std::fs::read_to_string(config_file).map_err(|e| miette!(e))?;
let doc = config_str.parse::<KdlDocument>()?;
let endpoint = doc
.get("endpoint")
.ok_or(miette!("Missing 'endpoint'"))?
.get(0)
.ok_or(miette!("'endpoint' has no value"))?
.as_string()
.ok_or(miette!("'endpoint' value must be a string"))?
.to_string();
let pairs_file = PathBuf::from(
doc.get("pairs_file")
.ok_or(miette!("Missing 'pairs_file'"))?
.get(0)
.ok_or(miette!("'pairs_file' has no value"))?
.as_string()
.ok_or(miette!("'pairs_file' value must be a string"))?
.to_string(),
);
let concurrency =
doc.get("concurrency")
.ok_or(miette!("Missing 'concurrency'"))?
.get(0)
.ok_or(miette!("'concurrency' has no value"))?
.as_integer()
.ok_or(miette!("'concurrency' value must be an integer"))? as usize;
Ok(Self {
endpoint,
pairs_file,
concurrency,
})
}
}

65
offchain/src/main.rs Normal file
View file

@ -0,0 +1,65 @@
use clap::Parser;
use log::LevelFilter;
use std::path::PathBuf;
mod config;
mod run;
/// Arbitrage bot
#[derive(Parser, Debug)]
#[command(version, author, about)]
struct Args {
/// Configuration file
#[arg(short = 'c', long = "config", env = "ARBI_CONFIG")]
config: PathBuf,
/// Log level
#[arg(long = "log-level", env = "ARBI_LOG_LEVEL", default_value = "info")]
log_level: String,
#[command(subcommand)]
command: Command,
}
#[derive(Parser, Debug)]
enum Command {
// Run the arbitrage bot
Run {},
// Check the configuration file
CheckConfig {},
}
fn main() -> miette::Result<()> {
let args = Args::parse();
let log_level_filter = match args.log_level.to_lowercase().as_str() {
"debug" => LevelFilter::Debug,
"trace" => LevelFilter::Trace,
"warn" => LevelFilter::Warn,
"error" => LevelFilter::Error,
"info" => LevelFilter::Info,
_ => LevelFilter::Info, // Default to info
};
env_logger::Builder::new()
.filter_level(log_level_filter)
.init();
match args.command {
Command::Run {} => {
let config = config::Config::parse_config(args.config)?;
run::run(config)?;
Ok(())
}
Command::CheckConfig {} => {
let config = config::Config::parse_config(args.config)?;
println!("Configuration correctly parsed\n{:#?}", config);
Ok(())
}
}
}

166
offchain/src/pairs.rs Normal file
View file

@ -0,0 +1,166 @@
use std::{collections::HashMap, path::PathBuf, str::FromStr};
use alloy::primitives::{aliases::U112, Address};
use miette::{miette, Result};
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use log::{debug, info};
#[derive(Debug, Serialize, Deserialize)]
pub struct Pair {
token0: Address,
token1: Address,
reserve0: U112,
reserve1: U112,
factory: Address,
}
#[derive(Debug, Eq, Hash, PartialEq)]
struct AddressPair(Address, Address);
impl Serialize for AddressPair {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(format!("{:?}:{:?}", self.0, self.1).as_str())
}
}
impl<'de> Deserialize<'de> for AddressPair {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct AddressPairVisitor;
impl Visitor<'_> for AddressPairVisitor {
type Value = AddressPair;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string containing two addresses separated by a colon (e.g. \"0x...0:0x...0\")")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
let parts: Vec<&str> = value.splitn(2, ':').map(str::trim).collect();
if parts.len() != 2 {
return Err(E::custom(format!(
"Invalid format, expected 'address1:address2' but got \"{}\"",
value
)));
}
let token0 = Address::from_str(parts[0])
.map_err(|e| E::custom(format!("Error parsing the address \"{}\"", e)))?;
let token1 = Address::from_str(parts[1])
.map_err(|e| E::custom(format!("Error parsing the address \"{}\"", e)))?;
Ok(AddressPair(token0, token1))
}
}
deserializer.deserialize_str(AddressPairVisitor)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Pairs {
pairs: HashMap<Address, Pair>,
by_tokens: HashMap<AddressPair, Vec<Address>>,
}
impl Pairs {
pub fn new(filename: &PathBuf) -> Result<Self> {
debug!("Looking for pairs file in {:?}", filename);
let path = std::path::Path::new(&filename);
if path.exists() {
info!("Found existing {:?}", filename);
let data = std::fs::read_to_string(path).map_err(|e| miette!(e))?;
serde_json::from_str(&data).map_err(|e| miette!(e))
} else {
info!(
"File {:?} doesn't exist, creating new structure from scratch",
filename
);
Ok(Pairs {
pairs: HashMap::new(),
by_tokens: HashMap::new(),
})
}
}
fn get(&self, address: Address) -> Option<&Pair> {
self.pairs.get(&address)
}
pub fn add(
&mut self,
pair: Address,
token0: Address,
token1: Address,
reserve0: U112,
reserve1: U112,
factory: Address,
) -> Option<()> {
let old = self.pairs.insert(
pair,
Pair {
token0,
token1,
reserve0,
reserve1,
factory,
},
);
match old {
Some(old) => {
info!("Pair {} already present, updating reserves (reserve0: {} -> {}; reserve1: {} -> {})",
pair, old.reserve0, reserve0, old.reserve1, reserve1
);
Some(())
}
None => {
info!("First time seeing pair {}, adding it", { pair });
match self.by_tokens.get_mut(&AddressPair(token0, token1)) {
Some(tokens) => {
tokens.push(pair);
info!(
"Already know {} pairs with tokens {:?} and {:?}",
tokens.len(),
token0,
token1
);
}
None => {
self.by_tokens
.insert(AddressPair(token0, token1), vec![pair]);
info!(
"It's the first pair with tokens {:?} and {:?}",
token0, token1
);
}
}
None
}
}
}
pub fn dump(&self, filename: &PathBuf) -> Result<()> {
let data = serde_json::to_string(&self).map_err(|e| miette!(e))?;
std::fs::write(filename, data).map_err(|e| miette!(e))?;
Ok(())
}
}

View file

@ -0,0 +1,44 @@
mod pairs;
use alloy::primitives::Address;
use futures_util::Stream;
use log::debug;
use std::{
pin::Pin,
task::{Context, Poll},
};
#[derive(Debug)]
pub enum Action {
ProcessPair(Address),
}
pub struct PriorityQueue(pub Vec<Action>);
impl PriorityQueue {
pub fn new() -> Self {
PriorityQueue(Vec::new())
}
pub fn push(&mut self, action: Action) {
debug!("Adding action {:?} to the priority queue", action);
self.0.push(action);
}
}
impl Stream for PriorityQueue {
type Item = Action;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.0.pop() {
None => Poll::Ready(None),
Some(action) => {
debug!("Consuming action {:?} to the priority queue", action);
match action {
Action::ProcessPair(pair) => Poll::Ready(Some(Action::ProcessPair(pair))),
}
}
}
}
}

150
offchain/src/run.rs Normal file
View file

@ -0,0 +1,150 @@
#[path = "pairs.rs"]
mod pairs;
#[path = "priority_queue.rs"]
mod priority_queue;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use crate::config::Config;
use alloy::{
eips::BlockNumberOrTag,
primitives::Address,
providers::{
fillers::FillProvider, DynProvider, Provider, ProviderBuilder, RootProvider, WsConnect,
},
pubsub::PubSubFrontend,
rpc::types::Filter,
};
use futures_util::{stream, StreamExt};
use miette::{miette, Result};
use log::{debug, info};
use pairs::Pairs;
use priority_queue::{Action, PriorityQueue};
use tokio::time::sleep;
alloy::sol!(
#[allow(missing_docs)]
#[sol(rpc)]
IUniswapV2Pair,
"abi/IUniswapV2Pair.json"
);
async fn process_swaps(
ws: WsConnect,
priority_queue: Arc<Mutex<PriorityQueue>>,
) -> eyre::Result<()> {
let provider = ProviderBuilder::new().on_ws(ws).await.unwrap();
let filter = Filter::new()
.event("Swap(address,uint256,uint256,uint256,uint256,address)")
.from_block(BlockNumberOrTag::Latest);
let sub = provider.subscribe_logs(&filter).await?;
let mut stream = sub.into_stream();
let mut latest_block = 0;
while let Some(log) = stream.next().await {
let block_number = log.block_number.unwrap();
if block_number > latest_block {
latest_block = block_number;
info!("Processing block number {:?}", block_number);
}
priority_queue
.lock()
.unwrap()
.push(Action::ProcessPair(log.address()));
}
Ok(())
}
async fn process_pair(
ws: WsConnect,
pairs: Arc<Mutex<Pairs>>,
pair_address: Address,
) -> eyre::Result<()> {
let provider = ProviderBuilder::new().on_ws(ws).await.unwrap();
let pair = IUniswapV2Pair::new(pair_address, provider);
let token0 = pair.token0().call().await?._0;
let token1 = pair.token1().call().await?._0;
let reserve0 = pair.getReserves().call().await?.reserve0;
let reserve1 = pair.getReserves().call().await?.reserve1;
let factory = pair.factory().call().await?._0;
pairs
.lock()
.unwrap()
.add(pair_address, token0, token1, reserve0, reserve1, factory);
Ok(())
}
async fn consume_priority_queue(
ws: WsConnect,
pairs: Arc<Mutex<Pairs>>,
priority_queue: Arc<Mutex<PriorityQueue>>,
config: Config,
) {
let mut guard = priority_queue.lock().unwrap();
let actions: Vec<Action> = guard.0.drain(..).collect(); //move all actions to temporary vector in order to unlock the mutex
drop(guard); //release before the expensive operation
stream::iter(actions)
.for_each_concurrent(config.concurrency, |action| {
let pairs_clone = pairs.clone();
let ws = ws.clone();
async move {
match action {
Action::ProcessPair(pair_address) => {
info!("Processing pair: {:?}", pair_address);
process_pair(ws, pairs_clone, pair_address).await;
}
}
}
})
.await;
}
async fn manage_interruption(pairs: Arc<Mutex<Pairs>>, config: Config) -> eyre::Result<()> {
tokio::signal::ctrl_c()
.await
.expect("Error receiving the signal");
info!("Program correctly interrupted by the user");
match pairs.lock().unwrap().dump(&config.pairs_file) {
Ok(_) => info!("Pairs correctly dumped to {:?}", &config.pairs_file),
Err(e) => info!("Error dumping pairs to {:?} {:?}", &config.pairs_file, e),
}
std::process::exit(0);
}
pub fn run(config: Config) -> Result<()> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let pairs = Arc::new(Mutex::new(Pairs::new(&config.pairs_file)?));
let priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let ws = WsConnect::new(&config.endpoint);
runtime.block_on(async {
tokio::spawn(manage_interruption(pairs.clone(), config.clone()));
// process all the `Swap` events adding actions to the queue
tokio::spawn(process_swaps(ws.clone(), priority_queue.clone()));
loop {
consume_priority_queue(ws.clone(), pairs.clone(), priority_queue.clone(), config.clone()).await;
debug!("The entire queue has been processed, waiting 100ms before checking if new actions are available...");
sleep(Duration::from_millis(100)).await;
}
});
Ok(())
}