Introducing Carbon: An Indexing Framework on Solana

Introducing Carbon: An Indexing Framework on Solana

Kellian Alex Vitré, Tech Lead @ SevenLabs

Introducing Carbon: An Indexing Framework on Solana

We're excited to announce Carbon, an easy-to-use indexing framework that aims to simplify how developers work with data on Solana. After two months of development at SevenLabs, we've created a solution that can be used by developers to maintain indexes of their program accounts and activities, to build realtime monitors of Solana protocols, and any usecase that requires sourcing and processing account updates and transaction notificiations.

The Problem We're Solving

Every Solana application needs reliable access to on-chain data, whether it's tracking program events, monitoring program accounts or storing parsed historical transactions data for analysis. RPC does not provide a developer friendly nor optimized read layer for those use cases, which leads all developers to build their own custom indexing and data aggregation pipelines. There is a lack of education, infrastructure and mostly developer tooling for indexing on Solana. Developers just want their program accounts in a database and parsed event/instruction streams. Carbon solves this with an end-to-end pipeline for indexing Solana data whilst catering for all budgets and requirements with modularity for data sources.

Key Principles

The Pipeline

Rust
pub struct Pipeline {
    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
    pub account_pipes: Vec<Box<dyn AccountPipes>>,
    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
    pub metrics: Arc<MetricsCollection>,
}
Rust
Pipeline::builder()
    .datasource(my_datasource)
    .instruction(my_decoder, my_processor)
    .account(account_decoder, account_processor)
    .metrics(my_metrics)
    .build()?

Sourcing Data

Rust
#[async_trait]
pub trait Datasource: Send + Sync {
    async fn consume(
        &self,
        sender: &tokio::sync::mpsc::UnboundedSender<Update>,
        cancellation_token: CancellationToken,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()>;

    fn update_types(&self) -> Vec<UpdateType>;
}

#[derive(Debug, Clone)]
pub enum Update {
    Account(AccountUpdate),
    Transaction(TransactionUpdate),
    AccountDeletion(AccountDeletion),
}

#[derive(Debug, Clone)]
pub struct AccountUpdate {
    pub pubkey: solana_sdk::pubkey::Pubkey,
    pub account: solana_sdk::account::Account,
    pub slot: u64,
}

#[derive(Debug, Clone)]
pub struct TransactionUpdate {
    pub signature: solana_sdk::signature::Signature,
    pub transaction: solana_sdk::transaction::VersionedTransaction,
    pub meta: solana_transaction_status::TransactionStatusMeta,
    pub is_vote: bool,
    pub slot: u64,
}
Crate Name Description Affordability Ease of Setup
carbon-rpc-block-subscribe-datasource Uses blockSubscribe with Solana WS JSON RPC to listen to real-time on-chain transactions Cheap (just RPC) Easy
carbon-rpc-program-subscribe-datasource Uses programSubscribe with Solana WS JSON RPC to listen to real-time on-chain account updates Cheap (just RPC) Easy
carbon-rpc-transaction-crawler-datasource Crawls historical successful transactions for a specific address in reverse chronological order using Solana JSON RPC Cheap (just RPC) Easy
carbon-helius-atlas-ws-datasource Utilizes Helius Geyser-enhanced WebSocket for streaming account and transaction updates Medium (Helius Plan) Medium
carbon-yellowstone-grpc-datasource Subscribes to a Yellowstone gRPC Geyser plugin enhanced full node to stream account and transaction updates Expensive (Geyser Fullnode) Complex
Plaintext
carbon-yellowstone-grpc-datasource = { version = "0.1.2", git = "https://github.com/sevenlabs-hq/carbon.git" }
Rust
use carbon_rpc_program_subscribe_datasource::{Filters, RpcProgramSubscribe};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    env_logger::init();
    dotenv::dotenv().ok();

    Pipeline::builder()
        .datasource(RpcProgramSubscribe::new(
            // Websocket RPC url, usually starts with "wss://"
            env::var("RPC_WS_URL").unwrap_or_default(),
            Filters::new(
                MY_PROGRAM_ID,
                Some(RpcProgramAccountsConfig {
                    filters: None,
                    account_config: RpcAccountInfoConfig {
                        encoding: Some(UiAccountEncoding::Base64),
                        ..Default::default()
                    },
                    ..Default::default()
                }),
            ),
        ))
        .build()?
        .run()
        .await?;

    Ok(())
}

Decoding Data

Bash
$ carbon-cli parse --idl my_program.json --output ./src/decoders

Note: it is possible to build custom decoders without using our CLI by implementing the AccountDecoder or InstructionDecoder trait on a struct:

Rust
pub trait AccountDecoder<'a> {
    type AccountType;

    fn decode_account(
        &self,
        account: &'a solana_sdk::account::Account,
    ) -> Option<DecodedAccount<Self::AccountType>>;
}

pub trait InstructionDecoder<'a> {
    type InstructionType;

    fn decode_instruction(
        &self,
        instruction: &'a solana_sdk::instruction::Instruction,
    ) -> Option<DecodedInstruction<Self::InstructionType>>;
}

Processing Data

Rust
/// For Account Processors
type InputType = (
    (carbon_core::account::AccountMetadata, solana_sdk::account::Account),
    carbon_core::account::DecodedAccount<T>
);

/// For Instruction Processors
type InputType = (
    carbon_core::instruction::InstructionMetadata,
    carbon_core::instruction::DecodedInstruction<T>,
    Vec<carbon_core::instruction::NestedInstruction>
);
Rust
pub trait ArrangeAccounts {
    type ArrangedAccounts;

    fn arrange_accounts(
        &self,
        accounts: Vec<solana_sdk::instruction::AccountMeta>,
    ) -> Option<Self::ArrangedAccounts>;
}
Rust
#[async_trait]
impl Processor for PumpfunInstructionProcessor {
    type InputType = (
        InstructionMetadata,
        DecodedInstruction<PumpfunInstruction>,
        Vec<NestedInstruction>,
    );

    async fn process(
        &mut self,
        data: Self::InputType,
        _metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let pumpfun_instruction: PumpfunInstruction = data.1.data;

        match pumpfun_instruction {
            PumpfunInstruction::CreateEvent(create_event) => {
                println!("\nNew token created: {:#?}", create_event);
            }
            PumpfunInstruction::Buy(buy) => {
                let accounts = buy.arrange_accounts(data.1.accounts).unwrap();
                println!("\nBuy: {:#?}, Buyer: {:?}", buy, accounts.user);
            }
            _ => {}
        };

        Ok(())
    }
}

Monitoring

Carbon includes a comprehensive metrics system that helps you monitor your indexing pipeline in production. The framework currently provides two built-in implementations:

  1. Console Logging (LogMetrics)
Rust
// Real-time console output with key statistics
00:05:23 (+5s) | 1000 processed (95%), 950 successful, 50 failed (5%),
100 in queue, avg: 15ms, min: 5ms, max: 50ms
  1. Prometheus Integration (PrometheusMetrics)

The metrics system tracks three types of measurements:

Rust
pub trait Metrics: Send + Sync {
    // Track values that can fluctuate (e.g., queue size)
    async fn update_gauge(&self, name: &str, value: f64) -> CarbonResult<()>;

    // Count occurrences (e.g., processed updates)
    async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()>;

    // Measure distributions (e.g., processing times)
    async fn record_histogram(&self, name: &str, value: f64) -> CarbonResult<()>;
}

Key metrics tracked out of the box:

Custom Monitoring

You can implement your own monitoring solution by implementing the Metrics trait:

Rust
#[async_trait]
impl Metrics for MyCustomMetrics {
    async fn initialize(&self) -> CarbonResult<()> {
        // Setup your metrics system
    }

    async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()> {
        // Track your counters
    }

    // Implement other required methods...
}

// Use it in your pipeline
Pipeline::builder()
    .metrics(Arc::new(MyCustomMetrics::new()))
    .build()?

This modular approach to metrics allows you to:

Benefits of Using Carbon

What's Next?

We're actively working on:

Get Involved

Carbon is open for contributions! Whether you're building new datasources, decoders, or just have ideas for improvements, we'd love to hear from you.

Try Carbon today and let us know what you think! We're excited to see what you'll build with it.