Filtering Configuration
Filtering allows you to control when streaming callbacks are triggered, reducing noise and focusing on meaningful changes.
Overview
The filtering system provides:
- Change Detection - Only trigger on actual data changes
- Threshold Filtering - Trigger only on significant changes
- Rate Limiting - Control maximum update frequency
- Field Selection - Monitor specific fields only
FilterConfig Class
Custom filtering configuration for precise control.
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class FilterConfig:
# Fields to monitor for changes
change_fields: List[str] = field(default_factory=lambda: [
"price_usd", "price_native", "volume.h24", "liquidity.usd"
])
# Change thresholds (None = any change triggers)
price_change_threshold: Optional[float] = None # Price change % (e.g., 0.01 = 1%)
volume_change_threshold: Optional[float] = None # Volume change % (e.g., 0.10 = 10%)
liquidity_change_threshold: Optional[float] = None # Liquidity change % (e.g., 0.05 = 5%)
# Rate limiting
max_updates_per_second: Optional[float] = None # Max updates/sec (e.g., 1.0 = 1/sec)
Parameters
change_fields
: List of fields to monitor. Only changes in these fields can trigger updates. Supports nested fields (e.g., "volume.h24")price_change_threshold
: Price change percentage threshold. Set to 0.01 for 1% changesvolume_change_threshold
: Volume change percentage threshold. Set to 0.10 for 10% changesliquidity_change_threshold
: Liquidity change percentage threshold. Set to 0.05 for 5% changesmax_updates_per_second
: Limits update frequency to avoid overwhelming callbacks
Custom Configuration Examples
from dexscreen.utils import FilterConfig
# High-frequency trading config
hft_config = FilterConfig(
change_fields=["price_usd"], # Only monitor price
price_change_threshold=0.0001, # 0.01% changes
max_updates_per_second=10.0 # Allow frequent updates
)
# Long-term monitoring config
hodl_config = FilterConfig(
change_fields=["price_usd", "liquidity.usd"],
price_change_threshold=0.05, # 5% changes
liquidity_change_threshold=0.20, # 20% liquidity changes
max_updates_per_second=0.1 # Once per 10 seconds max
)
# Volume surge detection
volume_config = FilterConfig(
change_fields=["volume.h24", "volume.h1", "transactions.m5.buys", "transactions.m5.sells"],
volume_change_threshold=0.15, # 15% volume changes
max_updates_per_second=0.5 # Once per 2 seconds max
)
FilterPresets
Pre-configured filters for common use cases.
simple_change_detection()
Basic change detection (default behavior) - any monitored field change triggers an update.
Use for: General monitoring where you want all changes
significant_price_changes(threshold)
Only triggers when price changes exceed the threshold.
Use for: Price alerts, trading signals
significant_all_changes(price_threshold, volume_threshold, liquidity_threshold)
config = FilterPresets.significant_all_changes(
price_threshold=0.005, # 0.5% price change
volume_threshold=0.10, # 10% volume change
liquidity_threshold=0.05 # 5% liquidity change
)
All metrics must meet their thresholds to trigger.
Use for: High-confidence signals requiring multiple confirmations
rate_limited(max_per_second)
Limits update frequency regardless of changes.
Use for: UI updates, reducing callback load
ui_friendly()
Optimized for user interfaces:
- Price change threshold: 0.1%
- Volume change threshold: 5%
- Max 2 updates per second
Use for: Dashboard displays, real-time charts
monitoring()
Optimized for monitoring systems:
- Price change threshold: 1%
- Volume change threshold: 10%
- Liquidity change threshold: 5%
- Max 0.2 updates per second (once per 5 seconds)
Use for: Alert systems, background monitoring
Usage Examples
Basic Filtering
# Default filtering (changes only)
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=["0x88e6..."],
callback=handle_update,
filter=True # Default
)
# No filtering (all updates)
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=["0x88e6..."],
callback=handle_update,
filter=False
)
# Preset filtering
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=["0x88e6..."],
callback=handle_update,
filter=FilterPresets.significant_price_changes(0.02) # 2%
)
Advanced Filtering
# Multi-metric monitoring
comprehensive_config = FilterConfig(
change_fields=["price_usd", "volume.h24", "liquidity.usd", "price_change.h24"],
price_change_threshold=0.02, # 2% price change
volume_change_threshold=0.25, # 25% volume change
liquidity_change_threshold=0.10, # 10% liquidity change
max_updates_per_second=1.0 # Max once per second
)
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=addresses,
callback=comprehensive_handler,
filter=comprehensive_config
)
Conditional Filtering
Combine filtering with callback logic:
# Filter config for significant changes
config = FilterConfig(
change_fields=["price_usd", "volume.h1"],
price_change_threshold=0.001 # 0.1%
)
# Additional logic in callback
def smart_callback(pair: TokenPair):
# Filter already ensured significant change
# Add more conditions
if pair.volume.h1 > 10_000: # Minimum volume
if pair.liquidity and pair.liquidity.usd > 50_000: # Minimum liquidity
process_significant_update(pair)
Monitorable Fields
Fields you can monitor in change_fields
:
Price Fields
price_usd
- USD priceprice_native
- Native token price
Volume Fields
volume.h24
- 24-hour volumevolume.h6
- 6-hour volumevolume.h1
- 1-hour volumevolume.m5
- 5-minute volume
Liquidity Fields
liquidity.usd
- USD liquidityliquidity.base
- Base token liquidityliquidity.quote
- Quote token liquidity
Transaction Fields
transactions.m5.buys
- 5-minute buy counttransactions.m5.sells
- 5-minute sell counttransactions.h1.buys
- 1-hour buy counttransactions.h1.sells
- 1-hour sell counttransactions.h6.buys
- 6-hour buy counttransactions.h6.sells
- 6-hour sell counttransactions.h24.buys
- 24-hour buy counttransactions.h24.sells
- 24-hour sell count
Price Change Fields
price_change.m5
- 5-minute price change %price_change.h1
- 1-hour price change %price_change.h6
- 6-hour price change %price_change.h24
- 24-hour price change %
Other Fields
fdv
- Fully diluted valuation
Note: Use dot notation for nested fields (e.g., volume.h24
, transactions.m5.buys
)
Performance Considerations
Filter Efficiency
Filters are applied client-side after data is fetched:
- Data Fetching: Happens at the specified interval
- Change Detection: Compares with previous data
- Threshold Check: Applies configured thresholds
- Rate Limiting: Enforces max update frequency
- Callback Execution: Only if all conditions pass
Optimization Tips
- Minimize Monitored Fields: Only include fields you actually need
# Good - specific fields
change_fields=["price_usd", "volume.h24"]
# Bad - monitoring everything
change_fields=["price_usd", "price_native", "volume.h24", "volume.h6", ...]
- Set Appropriate Thresholds: Balance between sensitivity and noise
# High-value assets - larger threshold
eth_config = FilterConfig(price_change_threshold=0.01) # 1%
# Low-value/volatile assets - smaller threshold
meme_config = FilterConfig(price_change_threshold=0.001) # 0.1%
- Use Rate Limiting: Prevent callback overload
# For UI updates
ui_config = FilterConfig(max_updates_per_second=2.0)
# For logging/database
db_config = FilterConfig(max_updates_per_second=0.1) # Once per 10 sec
Common Patterns
Price Alert System
# Different alert levels
minor_alert = FilterConfig(
change_fields=["price_usd"],
price_change_threshold=0.02, # 2%
max_updates_per_second=1.0
)
major_alert = FilterConfig(
change_fields=["price_usd"],
price_change_threshold=0.05, # 5%
max_updates_per_second=None # No rate limit for major alerts
)
# Subscribe with different callbacks
await client.subscribe_pairs(chain_id, pairs, minor_callback, filter=minor_alert)
await client.subscribe_pairs(chain_id, pairs, major_callback, filter=major_alert)
Multi-Strategy Filtering
strategies = {
"scalping": FilterConfig(
change_fields=["price_usd"],
price_change_threshold=0.0005, # 0.05%
max_updates_per_second=10.0
),
"swing": FilterConfig(
change_fields=["price_usd", "volume.h24"],
price_change_threshold=0.02, # 2%
volume_change_threshold=0.30, # 30%
max_updates_per_second=0.2 # Once per 5 sec
),
"liquidity": FilterConfig(
change_fields=["liquidity.usd"],
liquidity_change_threshold=0.10, # 10%
max_updates_per_second=0.1 # Once per 10 sec
)
}
# Apply different strategies to different pairs
for strategy_name, config in strategies.items():
pairs = get_pairs_for_strategy(strategy_name)
callback = create_strategy_callback(strategy_name)
await client.subscribe_pairs("ethereum", pairs, callback, filter=config)
Production Alert System
from enum import Enum
from datetime import datetime
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class ProductionAlertSystem:
def __init__(self):
self.alert_count = {level: 0 for level in AlertLevel}
async def deploy_monitoring(self, client, watch_pairs):
"""Deploy production-grade filtering system"""
# Tier 1: Informational (1% moves)
info_filter = FilterConfig(
change_fields=["price_usd"],
price_change_threshold=0.01,
max_updates_per_second=0.5 # Don't spam
)
# Tier 2: Warning (5% moves + volume)
warning_filter = FilterConfig(
change_fields=["price_usd", "volume.h24"],
price_change_threshold=0.05,
volume_change_threshold=0.75,
max_updates_per_second=2.0
)
# Tier 3: Critical (10% moves + liquidity issues)
critical_filter = FilterConfig(
change_fields=["price_usd", "liquidity.usd"],
price_change_threshold=0.10,
liquidity_change_threshold=0.25,
max_updates_per_second=10.0 # No limits on critical
)
# Deploy all tiers
await client.subscribe_pairs(
"ethereum", watch_pairs,
lambda p: self.handle_alert(p, AlertLevel.INFO),
filter=info_filter
)
await client.subscribe_pairs(
"ethereum", watch_pairs,
lambda p: self.handle_alert(p, AlertLevel.WARNING),
filter=warning_filter
)
await client.subscribe_pairs(
"ethereum", watch_pairs,
lambda p: self.handle_alert(p, AlertLevel.CRITICAL),
filter=critical_filter
)
print(f"đ Production monitoring deployed for {len(watch_pairs)} pairs")
async def handle_alert(self, pair, level: AlertLevel):
self.alert_count[level] += 1
if level == AlertLevel.CRITICAL:
await self.send_sms(f"CRITICAL: {pair.base_token.symbol} moved {pair.price_change.h24:+.1f}%")
await self.send_email_alert(pair, level)
elif level == AlertLevel.WARNING:
await self.send_slack_message(pair, level)
else:
print(f"đ {pair.base_token.symbol}: {pair.price_change.h24:+.2f}%")
async def send_sms(self, message): pass # Implement SMS
async def send_email_alert(self, pair, level): pass # Implement email
async def send_slack_message(self, pair, level): pass # Implement Slack
# Usage
alert_system = ProductionAlertSystem()
watch_list = ["0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"] # Add your pairs
await alert_system.deploy_monitoring(client, watch_list)