Introducing Carbon: An Indexing Framework on Solana
Introducing Carbon: An Indexing Framework on Solana
We're excited to announce Carbon, an easy-to-use indexing framework that aims to simplify how developers work with data on Solana. After two months of development at SevenLabs, we've created a solution that can be used by developers to maintain indexes of their program accounts and activities, to build realtime monitors of Solana protocols, and any usecase that requires sourcing and processing account updates and transaction notificiations.
The Problem We're Solving
Every Solana application needs reliable access to on-chain data, whether it's tracking program events, monitoring program accounts or storing parsed historical transactions data for analysis. RPC does not provide a developer friendly nor optimized read layer for those use cases, which leads all developers to build their own custom indexing and data aggregation pipelines. There is a lack of education, infrastructure and mostly developer tooling for indexing on Solana. Developers just want their program accounts in a database and parsed event/instruction streams. Carbon solves this with an end-to-end pipeline for indexing Solana data whilst catering for all budgets and requirements with modularity for data sources.
Key Principles
The Pipeline
At the core of Carbon is a robust pipeline system that efficiently processes updates through multiple stages.
It is built to handle different type of updates and route them through appropriate processors.
pub struct Pipeline {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: Arc<MetricsCollection>,
}
- You can use the builder pattern to construct your
Pipeline
.
Pipeline::builder()
.datasource(my_datasource)
.instruction(my_decoder, my_processor)
.account(account_decoder, account_processor)
.metrics(my_metrics)
.build()?
- In order to run Carbon, execute the
run
method on a builtPipeline
.
Sourcing Data
- The
Datasource
trait can be implemented to sendAccountUpdate
(s) orTransactionUpdate
(s) to the pipeline.
#[async_trait]
pub trait Datasource: Send + Sync {
async fn consume(
&self,
sender: &tokio::sync::mpsc::UnboundedSender<Update>,
cancellation_token: CancellationToken,
metrics: Arc<MetricsCollection>,
) -> CarbonResult<()>;
fn update_types(&self) -> Vec<UpdateType>;
}
#[derive(Debug, Clone)]
pub enum Update {
Account(AccountUpdate),
Transaction(TransactionUpdate),
AccountDeletion(AccountDeletion),
}
#[derive(Debug, Clone)]
pub struct AccountUpdate {
pub pubkey: solana_sdk::pubkey::Pubkey,
pub account: solana_sdk::account::Account,
pub slot: u64,
}
#[derive(Debug, Clone)]
pub struct TransactionUpdate {
pub signature: solana_sdk::signature::Signature,
pub transaction: solana_sdk::transaction::VersionedTransaction,
pub meta: solana_transaction_status::TransactionStatusMeta,
pub is_vote: bool,
pub slot: u64,
}
A
Datasource
can provide either or both historical or realtime data to the pipeline.The pipeline is not limited to be sourcing data from one source, it can have multiple data sources being consumed concurrently.
Important: it is a very edge use-case to have to implement your own
Datasource
, most of the time, you can choose from our list of already implemented, easy-to-plug data sources.
Crate Name | Description | Affordability | Ease of Setup |
---|---|---|---|
carbon-rpc-block-subscribe-datasource |
Uses blockSubscribe with Solana WS JSON RPC to listen to real-time on-chain transactions |
Cheap (just RPC) | Easy |
carbon-rpc-program-subscribe-datasource |
Uses programSubscribe with Solana WS JSON RPC to listen to real-time on-chain account updates |
Cheap (just RPC) | Easy |
carbon-rpc-transaction-crawler-datasource |
Crawls historical successful transactions for a specific address in reverse chronological order using Solana JSON RPC | Cheap (just RPC) | Easy |
carbon-helius-atlas-ws-datasource |
Utilizes Helius Geyser-enhanced WebSocket for streaming account and transaction updates | Medium (Helius Plan) | Medium |
carbon-yellowstone-grpc-datasource |
Subscribes to a Yellowstone gRPC Geyser plugin enhanced full node to stream account and transaction updates | Expensive (Geyser Fullnode) | Complex |
- Important:
carbon-helius-atlas-ws-datasource
andcarbon-yellowstone-grpc-datasource
should be imported by specifyinggit
in the Cargo.toml, like so:
carbon-yellowstone-grpc-datasource = { version = "0.1.2", git = "https://github.com/sevenlabs-hq/carbon.git" }
- Example: using
carbon-program-subscribe-datasource
to track account updates on your program.
use carbon_rpc_program_subscribe_datasource::{Filters, RpcProgramSubscribe};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
dotenv::dotenv().ok();
Pipeline::builder()
.datasource(RpcProgramSubscribe::new(
// Websocket RPC url, usually starts with "wss://"
env::var("RPC_WS_URL").unwrap_or_default(),
Filters::new(
MY_PROGRAM_ID,
Some(RpcProgramAccountsConfig {
filters: None,
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
..Default::default()
},
..Default::default()
}),
),
))
.build()?
.run()
.await?;
Ok(())
}
Decoding Data
- Decoders can be generated from any Anchor IDL with the CLI's
parse
command.
$ carbon-cli parse --idl my_program.json --output ./src/decoders
After running this generation, you get named instruction accounts structs (
ArrangeAccounts
trait), decoded instruction data structs, decoded account data structs (CarbonDeserialize
trait) directly outputed from your generated processors in./src/decoders
, ready to be used.We have about a dozen of program decoders already published as crates ready to be used out of the box with a simple import.
Note: it is possible to build custom decoders without using our CLI by implementing the AccountDecoder
or InstructionDecoder
trait on a struct:
pub trait AccountDecoder<'a> {
type AccountType;
fn decode_account(
&self,
account: &'a solana_sdk::account::Account,
) -> Option<DecodedAccount<Self::AccountType>>;
}
pub trait InstructionDecoder<'a> {
type InstructionType;
fn decode_instruction(
&self,
instruction: &'a solana_sdk::instruction::Instruction,
) -> Option<DecodedInstruction<Self::InstructionType>>;
}
Processing Data
- Once decoded, data flows into your pipes in a different form based on the type of pipe.
/// For Account Processors
type InputType = (
(carbon_core::account::AccountMetadata, solana_sdk::account::Account),
carbon_core::account::DecodedAccount<T>
);
/// For Instruction Processors
type InputType = (
carbon_core::instruction::InstructionMetadata,
carbon_core::instruction::DecodedInstruction<T>,
Vec<carbon_core::instruction::NestedInstruction>
);
- Inside instruction processors, if the
ArrangeAccounts
trait is implemented, you can use the following method to access an easy-to-read accounts struct instead (ArrangedAccounts
).
pub trait ArrangeAccounts {
type ArrangedAccounts;
fn arrange_accounts(
&self,
accounts: Vec<solana_sdk::instruction::AccountMeta>,
) -> Option<Self::ArrangedAccounts>;
}
You are then free to use the data to store things in a remote or local database, to stream it to a different service of yours and pretty much anything else you might need to do with the data.
Example: Logging everytime a new PumpFun curve is initialized or a user buys.
#[async_trait]
impl Processor for PumpfunInstructionProcessor {
type InputType = (
InstructionMetadata,
DecodedInstruction<PumpfunInstruction>,
Vec<NestedInstruction>,
);
async fn process(
&mut self,
data: Self::InputType,
_metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let pumpfun_instruction: PumpfunInstruction = data.1.data;
match pumpfun_instruction {
PumpfunInstruction::CreateEvent(create_event) => {
println!("\nNew token created: {:#?}", create_event);
}
PumpfunInstruction::Buy(buy) => {
let accounts = buy.arrange_accounts(data.1.accounts).unwrap();
println!("\nBuy: {:#?}, Buyer: {:?}", buy, accounts.user);
}
_ => {}
};
Ok(())
}
}
Monitoring
Carbon includes a comprehensive metrics system that helps you monitor your indexing pipeline in production. The framework currently provides two built-in implementations:
- Console Logging (LogMetrics)
// Real-time console output with key statistics
00:05:23 (+5s) | 1000 processed (95%), 950 successful, 50 failed (5%),
100 in queue, avg: 15ms, min: 5ms, max: 50ms
- Prometheus Integration (PrometheusMetrics)
- Exposes metrics on port 9100
- Ready for integration with Grafana dashboards
- Industry-standard monitoring solution
The metrics system tracks three types of measurements:
pub trait Metrics: Send + Sync {
// Track values that can fluctuate (e.g., queue size)
async fn update_gauge(&self, name: &str, value: f64) -> CarbonResult<()>;
// Count occurrences (e.g., processed updates)
async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()>;
// Measure distributions (e.g., processing times)
async fn record_histogram(&self, name: &str, value: f64) -> CarbonResult<()>;
}
Key metrics tracked out of the box:
- Update processing rates and latencies
- Success/failure ratios
- Queue depths
- Processing times (min/max/avg)
- Account/Transaction-specific counters
Custom Monitoring
You can implement your own monitoring solution by implementing the Metrics
trait:
#[async_trait]
impl Metrics for MyCustomMetrics {
async fn initialize(&self) -> CarbonResult<()> {
// Setup your metrics system
}
async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()> {
// Track your counters
}
// Implement other required methods...
}
// Use it in your pipeline
Pipeline::builder()
.metrics(Arc::new(MyCustomMetrics::new()))
.build()?
This modular approach to metrics allows you to:
- Use multiple metrics systems simultaneously
- Integrate with your existing monitoring stack
- Create custom metrics for your specific use case
Benefits of Using Carbon
Faster Development: Stop rebuilding indexing infrastructure and focus on your application logic
Cost-Effective: Choose datasources that match your budget and scaling needs
Maintainable and Customizable: Modular makes it easy to update and extend your indexing logic
Production-Ready: Built-in metrics, error handling, and proper shutdown management
Commnity-Driven: Extensible architecture allows for community-contributed components
What's Next?
We're actively working on:
Additional datasource implementations
More utility decoders for popular Solana programs
Enhanced documentation and examples
Transaction pipes with schema matching for instructions and inner instructions
Performance optimizations
Get Involved
Carbon is open for contributions! Whether you're building new datasources, decoders, or just have ideas for improvements, we'd love to hear from you.
- GitHub repository: https://github.com/sevenlabs-hq/carbon
- Examples: https://github.com/sevenlabs-hq/carbon/tree/main/examples
- Discord: https://discord.gg/sevenlabs-hq
Try Carbon today and let us know what you think! We're excited to see what you'll build with it.