Add typst and tinymist to shell

Start writing notes

Notes on Uniswap V2's optimum

Fix error in formula

Test `computeAmountInt` using various deltas

Add `concurrency` to the default configuration file

Remove unused imports

Correctly propagate error

Allow dead code

Make the priority queue a real FIFO

Refactor: remove priority queue as stream and use channels

Increase buffer size

New `flashArbitrage` function

Comment with some ideas

Add pragma version

Refactor: decrease the amount of calls

Remove unused code

Re-enable tests

Remove comment

Process known pairs when started

Avoid re-allocating a new provider every time

Ignore `nixos.qcow2` file created by the VM

Add support for `aarch64-linux`

Add NixOS module and VM configuration

Add `itertools`

Add arbitrage opportunity detection

Implement `fallback` method for non standard callbacks

Add more logs

Fix sign error in optimum formula

Add deployment scripts and `agenix-shell` secrets

Bump cargo packages

Fix typo

Print out an error if processing a pair goes wrong

Add `actionlint` to formatters

Fix typo

Add TODO comment

Remove not relevant anymore comment

Big refactor

- process actions always in the correct order avoiding corner cases
- avoid using semaphores

New API key

Add `age` to dev shell

Used by Emacs' `agenix-mode` on my system

Fix parametric deploy scripts

Add `run-forge-tests` flake app

Remove fork URL from Solidity source

Remove `pairDir` argument

Add link to `ArbitrageManager`'s ABI

WIP
This commit is contained in:
Andrea Ciceri 2025-05-10 10:45:59 +02:00
parent 7a1e03ee7a
commit fb378c4931
No known key found for this signature in database
17 changed files with 1222 additions and 441 deletions

View file

@ -1,7 +1,9 @@
use std::{collections::HashMap, path::PathBuf, str::FromStr};
use alloy::primitives::U256;
use alloy::primitives::{aliases::U112, Address};
use itertools::Itertools;
use miette::{miette, Result};
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
@ -10,11 +12,11 @@ use log::{debug, info};
#[derive(Debug, Serialize, Deserialize)]
pub struct Pair {
token0: Address,
token1: Address,
reserve0: U112,
reserve1: U112,
factory: Address,
pub token0: Address,
pub token1: Address,
pub reserve0: U112,
pub reserve1: U112,
pub factory: Address,
}
#[derive(Debug, Eq, Hash, PartialEq)]
@ -68,6 +70,14 @@ impl<'de> Deserialize<'de> for AddressPair {
}
}
#[derive(Debug)]
pub struct ArbitrageOpportunity {
pair_a: Address,
pair_b: Address,
direction: bool, // true means token0 -> token1 -> token0
optimum: U256,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Pairs {
pairs: HashMap<Address, Pair>,
@ -97,10 +107,17 @@ impl Pairs {
}
}
fn get(&self, address: Address) -> Option<&Pair> {
#[allow(dead_code)]
pub fn get(&self, address: Address) -> Option<&Pair> {
self.pairs.get(&address)
}
pub fn get_tokens(&self, address: Address) -> Option<(Address, Address)> {
self.pairs
.get(&address)
.map(|pair| (pair.token0, pair.token1))
}
pub fn add(
&mut self,
pair: Address,
@ -133,11 +150,11 @@ impl Pairs {
info!("First time seeing pair {}, adding it", { pair });
match self.by_tokens.get_mut(&AddressPair(token0, token1)) {
Some(tokens) => {
tokens.push(pair);
Some(pairs) => {
pairs.push(pair);
info!(
"Already know {} pairs with tokens {:?} and {:?}",
tokens.len(),
pairs.len(),
token0,
token1
);
@ -161,6 +178,107 @@ impl Pairs {
let data = serde_json::to_string(&self).map_err(|e| miette!(e))?;
std::fs::write(filename, data).map_err(|e| miette!(e))?;
info!("{} Pairs saved to {:?}", self.pairs.len(), filename);
Ok(())
}
pub fn len(&self) -> usize {
self.pairs.len()
}
pub fn iter(&self) -> std::collections::hash_map::Iter<'_, Address, Pair> {
self.pairs.iter()
}
pub fn get_addresses(&self) -> Vec<Address> {
self.pairs.keys().cloned().collect()
}
pub fn get_reserves(&self, address: Address) -> Option<(U112, U112)> {
self.pairs
.get(&address)
.map(|pair| (pair.reserve0, pair.reserve1))
}
pub fn update_reserves(
&mut self,
address: Address,
reserve0: U112,
reserve1: U112,
) -> Result<()> {
if let Some(pair) = self.pairs.get_mut(&address) {
pair.reserve0 = reserve0;
pair.reserve1 = reserve1;
info!(
"Updated reserves for pair {}: reserve0: {}, reserve1: {}",
address, reserve0, reserve1
);
Ok(())
} else {
debug!("Pair {} not found", address);
Ok(()) // TODO return Err
}
}
// TODO at the moment we return all the opportunities, instead we should return only the two opportunities
// (token0 -> token1 -> token0 and token1 -> token0 -> token1) with the highest amountIn
// Remember: choosing an opportunity invalidates the other ones
pub fn look_for_opportunity(
&self,
token0: Address,
token1: Address,
) -> Vec<ArbitrageOpportunity> {
let mut opportunities: Vec<ArbitrageOpportunity> = Vec::new();
if let Some(pairs) = self.by_tokens.get(&AddressPair(token0, token1)) {
pairs.iter()
.permutations(2)
.any(|pairs| {
let pair_a = self.get(*pairs[0]).unwrap();
let pair_b = self.get(*pairs[1]).unwrap();
if let Some(optimum) = optimal_in(pair_a.reserve0, pair_a.reserve1, pair_b.reserve0, pair_b.reserve1) {
info!("Found arbitrage opportunity between pairs {} and {} swapping {} along token0 -> token1 -> token0", pairs[0], pairs[1], optimum);
opportunities.push(ArbitrageOpportunity{
pair_a: *pairs[0],
pair_b: *pairs[1],
direction: true,
optimum
});
}
if let Some(optimum) = optimal_in(pair_a.reserve1, pair_a.reserve0, pair_b.reserve1, pair_b.reserve0) {
info!("Found arbitrage opportunity between pairs {} and {} swapping {} along token1 -> token0 -> token1", pairs[0], pairs[1], optimum);
opportunities.push(ArbitrageOpportunity{
pair_a: *pairs[0],
pair_b: *pairs[1],
direction: false,
optimum
});
}
false
});
}
opportunities
}
}
fn optimal_in(x_a: U112, y_a: U112, x_b: U112, y_b: U112) -> Option<U256> {
let x_a = U256::from(x_a);
let x_b = U256::from(x_b);
let y_a = U256::from(y_a);
let y_b = U256::from(y_b);
let f = U256::from(997);
let ff = f.pow(U256::from(2));
let _1000 = U256::from(1000);
let _1000000 = U256::from(1000000);
let k = f * x_b / _1000 + ff / _1000 * x_a / _1000;
let phi = (ff * x_a * y_b * x_b * y_a / _1000000).root(2);
let psi = y_a * x_b;
if psi >= phi {
None
} else {
Some((phi - psi) / k)
}
}

View file

@ -1,44 +0,0 @@
mod pairs;
use alloy::primitives::Address;
use futures_util::Stream;
use log::debug;
use std::{
pin::Pin,
task::{Context, Poll},
};
#[derive(Debug)]
pub enum Action {
ProcessPair(Address),
}
pub struct PriorityQueue(pub Vec<Action>);
impl PriorityQueue {
pub fn new() -> Self {
PriorityQueue(Vec::new())
}
pub fn push(&mut self, action: Action) {
debug!("Adding action {:?} to the priority queue", action);
self.0.push(action);
}
}
impl Stream for PriorityQueue {
type Item = Action;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.0.pop() {
None => Poll::Ready(None),
Some(action) => {
debug!("Consuming action {:?} to the priority queue", action);
match action {
Action::ProcessPair(pair) => Poll::Ready(Some(Action::ProcessPair(pair))),
}
}
}
}
}

View file

@ -1,30 +1,28 @@
#[path = "pairs.rs"]
mod pairs;
#[path = "priority_queue.rs"]
mod priority_queue;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
#![allow(clippy::too_many_arguments)]
use crate::config::Config;
use alloy::{
eips::BlockNumberOrTag,
primitives::Address,
primitives::{aliases::U112, Address, U256},
providers::{
fillers::FillProvider, DynProvider, Provider, ProviderBuilder, RootProvider, WsConnect,
fillers::{BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller},
Provider, ProviderBuilder, RootProvider, WsConnect,
},
pubsub::PubSubFrontend,
rpc::types::Filter,
rpc::{client::RpcClient, types::Filter},
transports::layers::RetryBackoffLayer,
};
use futures_util::{stream, StreamExt};
use miette::{miette, Result};
use futures_util::StreamExt;
use log::{debug, info};
use miette::miette;
use std::{
collections::HashSet,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
#[path = "pairs.rs"]
mod pairs;
use pairs::Pairs;
use priority_queue::{Action, PriorityQueue};
use tokio::time::sleep;
use tokio::sync::mpsc;
alloy::sol!(
#[allow(missing_docs)]
@ -33,14 +31,27 @@ alloy::sol!(
"abi/IUniswapV2Pair.json"
);
async fn process_swaps(
ws: WsConnect,
priority_queue: Arc<Mutex<PriorityQueue>>,
) -> eyre::Result<()> {
let provider = ProviderBuilder::new().on_ws(ws).await.unwrap();
#[derive(Debug)]
pub enum Action {
ProcessNewPair(Address),
ProcessOldPair(Address, U256, U256, U256, U256),
}
type AlloyProvider = FillProvider<
JoinFill<
alloy::providers::Identity,
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
>,
RootProvider,
>;
async fn subscribe(
provider: Arc<AlloyProvider>,
pairs: Arc<Mutex<Pairs>>,
tx: mpsc::Sender<Action>,
) -> eyre::Result<()> {
let filter = Filter::new()
.event("Swap(address,uint256,uint256,uint256,uint256,address)")
.event("Swap(address,uint256,uint256,uint256,uint256,address)") // TODO manage also sync and skim
.from_block(BlockNumberOrTag::Latest);
let sub = provider.subscribe_logs(&filter).await?;
@ -55,61 +66,130 @@ async fn process_swaps(
info!("Processing block number {:?}", block_number);
}
priority_queue
.lock()
.unwrap()
.push(Action::ProcessPair(log.address()));
let IUniswapV2Pair::Swap {
sender: _,
amount0In,
amount1In,
amount0Out,
amount1Out,
to: _,
} = log.log_decode()?.inner.data;
let pair_address = log.address();
let pair_already_known = pairs.lock().unwrap().get(pair_address).is_some();
debug!("Event by pair {:?}", pair_address);
if pair_already_known {
tx.send(Action::ProcessOldPair(
pair_address,
amount0In,
amount1In,
amount0Out,
amount1Out,
))
.await?;
} else {
tx.send(Action::ProcessNewPair(pair_address)).await?;
}
}
Ok(())
}
async fn process_pair(
ws: WsConnect,
async fn process_new_pair(
pairs: Arc<Mutex<Pairs>>,
pair_address: Address,
provider: Arc<AlloyProvider>,
) -> eyre::Result<()> {
let provider = ProviderBuilder::new().on_ws(ws).await.unwrap();
let result: eyre::Result<()> = async {
let pair = IUniswapV2Pair::new(pair_address, provider.clone()); // todo can avoid che clone?
let token0 = pair.token0().call().await?._0;
let token1 = pair.token1().call().await?._0;
let reserve0 = pair.getReserves().call().await?.reserve0;
let reserve1 = pair.getReserves().call().await?.reserve1;
let factory = pair.factory().call().await?._0;
let pair = IUniswapV2Pair::new(pair_address, provider);
let token0 = pair.token0().call().await?._0;
let token1 = pair.token1().call().await?._0;
let reserve0 = pair.getReserves().call().await?.reserve0;
let reserve1 = pair.getReserves().call().await?.reserve1;
let factory = pair.factory().call().await?._0;
pairs
.lock()
.unwrap()
.add(pair_address, token0, token1, reserve0, reserve1, factory);
Ok(())
}
.await;
if let Err(e) = &result {
eprintln!("error adding the new pair {}: {}", pair_address, e);
}
result
}
fn process_old_pair(
pairs: Arc<Mutex<Pairs>>,
pair_address: Address,
amount0_in: U256,
amount1_in: U256,
amount0_out: U256,
amount1_out: U256,
) -> eyre::Result<()> {
let (reserve0, reserve1) = pairs.lock().unwrap().get_reserves(pair_address).unwrap();
pairs
.lock()
.unwrap()
.add(pair_address, token0, token1, reserve0, reserve1, factory);
.update_reserves(
pair_address,
reserve0 - U112::from(amount0_out) + U112::from(amount0_in),
reserve1 - U112::from(amount1_in) + U112::from(amount1_out),
)
.unwrap(); // TODO manage error
Ok(()) // TODO manage errors
}
async fn process_known_pairs(
pairs: Arc<Mutex<Pairs>>,
provider: Arc<AlloyProvider>,
) -> eyre::Result<()> {
let addresses = pairs.lock().unwrap().get_addresses();
let len = addresses.len();
info!("Recovering state of {:?} saved pairs", len);
for (i, address) in addresses.into_iter().enumerate() {
info!("Processing pair {}/{}: {:?}", i + 1, len, address);
let result: eyre::Result<()> = async {
let pair = IUniswapV2Pair::new(address, provider.clone());
let reserves = pair.getReserves().call().await?;
let reserve0 = reserves.reserve0;
let reserve1 = reserves.reserve1;
let _ = pairs
.lock()
.unwrap()
.update_reserves(address, reserve0, reserve1); // TODO manage error, should be ok however
Ok(())
}
.await;
if let Err(e) = &result {
eprintln!("Error processing pair {}: {}", address, e);
return result;
}
}
Ok(())
}
async fn consume_priority_queue(
ws: WsConnect,
pairs: Arc<Mutex<Pairs>>,
priority_queue: Arc<Mutex<PriorityQueue>>,
config: Config,
) {
let mut guard = priority_queue.lock().unwrap();
let actions: Vec<Action> = guard.0.drain(..).collect(); //move all actions to temporary vector in order to unlock the mutex
drop(guard); //release before the expensive operation
fn look_for_opportunity(pairs: Arc<Mutex<Pairs>>, involved_pairs: &HashSet<Address>) {
let pairs = pairs.lock().unwrap();
stream::iter(actions)
.for_each_concurrent(config.concurrency, |action| {
let pairs_clone = pairs.clone();
let ws = ws.clone();
async move {
match action {
Action::ProcessPair(pair_address) => {
info!("Processing pair: {:?}", pair_address);
process_pair(ws, pairs_clone, pair_address).await;
}
}
}
})
.await;
for pair_address in involved_pairs {
let (token0, token1) = pairs.get_tokens(*pair_address).unwrap();
let _opportunities = pairs.look_for_opportunity(token0, token1);
}
}
async fn manage_interruption(pairs: Arc<Mutex<Pairs>>, config: Config) -> eyre::Result<()> {
@ -126,25 +206,93 @@ async fn manage_interruption(pairs: Arc<Mutex<Pairs>>, config: Config) -> eyre::
std::process::exit(0);
}
pub fn run(config: Config) -> Result<()> {
let runtime = tokio::runtime::Runtime::new().unwrap();
pub fn run(config: Config) -> miette::Result<()> {
let runtime = tokio::runtime::Runtime::new().map_err(|e| miette!(e))?;
let pairs = Arc::new(Mutex::new(Pairs::new(&config.pairs_file)?));
let priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let ws = WsConnect::new(&config.endpoint);
let (tx, mut rx) = mpsc::channel::<Action>(5000);
runtime.block_on(async {
tokio::spawn(manage_interruption(pairs.clone(), config.clone()));
// process all the `Swap` events adding actions to the queue
tokio::spawn(process_swaps(ws.clone(), priority_queue.clone()));
let retry_layer = RetryBackoffLayer::new(50, 500, 100);
let client = RpcClient::builder()
.layer(retry_layer)
.ws(ws)
.await
.map_err(|e| miette!(e))?;
let provider = Arc::new(ProviderBuilder::new().on_client(client));
let signer: PrivateKeySigner = "".parse().unwrap();
info!("Subscribing to the events...");
tokio::spawn(subscribe(provider.clone(), pairs.clone(), tx.clone()));
info!("Processing known pairs...");
process_known_pairs(pairs.clone(), provider.clone())
.await
.map_err(|e| miette!(e))?;
info!("Finished processing known pairs...");
let mut queue_last_time_not_empty = Instant::now();
let mut block_processed = false;
let mut involved_pairs: HashSet<Address> = HashSet::new();
loop {
consume_priority_queue(ws.clone(), pairs.clone(), priority_queue.clone(), config.clone()).await;
let action = rx.try_recv();
debug!("The entire queue has been processed, waiting 100ms before checking if new actions are available...");
sleep(Duration::from_millis(100)).await;
if let Ok(action) = action {
queue_last_time_not_empty = Instant::now();
block_processed = false;
let len = rx.len();
debug!(
"Processing action {:?}, {:?} actions left",
action, len
);
match action {
Action::ProcessNewPair(pair_address) => {
process_new_pair(pairs.clone(), pair_address, provider.clone())
.await
.map_err(|e| miette!(e))?;
involved_pairs.insert(pair_address);
}
Action::ProcessOldPair(
pair_address,
amount0_in,
amount1_in,
amount0_out,
amount1_out,
) => {
process_old_pair(
pairs.clone(),
pair_address,
amount0_in,
amount1_in,
amount0_out,
amount1_out,
)
.map_err(|e| miette!(e))?;
involved_pairs.insert(pair_address);
}
}
} else {
if !block_processed && Instant::now().duration_since(queue_last_time_not_empty) > Duration::from_millis(50) {
info!("The actions queue has been empty for 100ms, we assume the entire block has been processed");
info!("Involved pairs: {:?}", involved_pairs);
look_for_opportunity(pairs.clone(), &involved_pairs);
block_processed = true;
involved_pairs.clear();
};
std::thread::sleep(Duration::from_millis(10));
}
}
});
Ok(())
#[allow(unreachable_code)]
Ok(())
})
}