Example: compare_pending_txs

Example

To run this example:

  • Clone the examples repository: git clone git@github.com:alloy-rs/examples.git
  • Run: cargo run --example compare_pending_txs
//! Example of comparing pending transactions from multiple providers.

use alloy::{
    network::AnyNetwork,
    providers::{Provider, ProviderBuilder},
};
use chrono::Utc;
use clap::Parser;
use eyre::Result;
use futures_util::StreamExt;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;

#[derive(Debug, Parser)]
struct Cli {
    #[clap(short = 'r', help = "rpcs to connect to, usage: -r <name>:<url> -r <name>:<url> ...")]
    rpcs: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
    let cli = Cli::parse();
    let tmp: Vec<(&str, &str)> = cli.rpcs.iter().filter_map(|s| s.split_once(':')).collect();
    let mut rpcs = vec![];
    for (name, url) in tmp {
        if url.starts_with("http") {
            eprintln!("skipping {} at {} because it is not a websocket/ipc endpoint", name, url);
            continue;
        }
        rpcs.push((name, url));
    }

    let mut total_streams = 0;
    let mut tasks = vec![];
    let (sx, mut rx) = mpsc::unbounded_channel();
    for (name, url) in &rpcs {
        let sx = sx.clone();
        let name = Arc::new(name.to_string());
        let url = url.to_string();

        let provider = match ProviderBuilder::new().network::<AnyNetwork>().on_builtin(&url).await {
            Ok(provider) => provider,
            Err(e) => {
                eprintln!("skipping {} at {} because of error: {}", name, url, e);
                continue;
            }
        };

        let mut stream = match provider.subscribe_pending_transactions().await {
            Ok(stream) => stream.into_stream().take(50),
            Err(e) => {
                eprintln!("skipping {} at {} because of error: {}", name, url, e);
                continue;
            }
        };

        total_streams += 1;

        tasks.push(tokio::spawn(async move {
            let _p = provider; // keep provider alive
            while let Some(tx_hash) = stream.next().await {
                if let Err(e) = sx.send((name.clone(), tx_hash, Utc::now())) {
                    eprintln!("sending to channel failed: {}", e);
                }
            }
        }));
    }

    tokio::spawn(async move {
        #[derive(Debug)]
        struct TxTrack {
            first_seen: chrono::DateTime<Utc>,
            seen_by: Vec<(Arc<String>, chrono::DateTime<Utc>)>,
        }

        let mut tracker = HashMap::new();
        while let Some((name, tx_hash, timestamp)) = rx.recv().await {
            let track = tracker
                .entry(tx_hash)
                .and_modify(|t: &mut TxTrack| {
                    t.seen_by.push((name.clone(), timestamp));
                })
                .or_insert_with(|| TxTrack {
                    first_seen: timestamp,
                    seen_by: vec![(name.clone(), timestamp)],
                });

            if track.seen_by.len() == total_streams {
                let mut msg = String::new();
                for (name, timestamp) in &track.seen_by {
                    msg.push_str(&format!(
                        "{} +{}ms ",
                        name,
                        (*timestamp - track.first_seen).num_milliseconds()
                    ));
                }
                println!(
                    "pending tx #{} at {} - {}",
                    tx_hash,
                    track.first_seen.timestamp_millis(),
                    msg
                );
                tracker.remove(&tx_hash);
            }
        }
    });

    for task in tasks {
        task.await?;
    }

    Ok(())
}

Find the source code on Github here.