Getting Started with Dexscreen
This guide will help you get up and running with Dexscreen quickly, enabling you to monitor and analyze DeFi data with ease.
Installation
Using uv
Using pip
From source
Basic Concepts
Dexscreen provides two main ways to interact with Dexscreener data:
- Query API - One-time data fetching (synchronous or async) for:
- Current price checks
- Token discovery and search
- Snapshot data analysis
-
Historical data retrieval
-
Streaming API - Real-time updates via HTTP polling for:
- Real-time price monitoring
- Trading bots
- Alert systems
- Portfolio tracking
Your First Query
Create a Client
from dexscreen import DexscreenerClient
# Basic client (recommended for most use cases)
client = DexscreenerClient()
# With browser impersonation (use when encountering anti-bot protection)
client = DexscreenerClient(impersonate="chrome136")
# Debug mode (enable verbose logging during development)
client = DexscreenerClient(debug=True)
Fetch Pair Data
# 1. Search for pairs (by token name or symbol)
pairs = client.search_pairs("PEPE")
if pairs:
print(f"Found {len(pairs)} PEPE pairs")
for pair in pairs[:5]: # Show top 5 results
print(f" {pair.base_token.symbol}/{pair.quote_token.symbol} on {pair.chain_id}")
print(f" Price: ${pair.price_usd:.8f}, DEX: {pair.dex_id}")
print(f" 24h Volume: ${pair.volume.h24:,.0f}")
print()
# 2. Get all pairs for a specific token
usdc_address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" # Ethereum USDC
pairs = client.get_pairs_by_token_address("ethereum", usdc_address)
print(f"Found {len(pairs)} USDC pairs on Ethereum")
# Find the highest liquidity USDC pair
if pairs:
best_usdc_pair = max(pairs, key=lambda p: p.liquidity.usd if p.liquidity else 0)
print(f"Best USDC pair: {best_usdc_pair.base_token.symbol}/{best_usdc_pair.quote_token.symbol}")
print(f"Liquidity: ${best_usdc_pair.liquidity.usd:,.0f}")
# 3. Get details for a specific pair
pair = client.get_pair_by_pair_address(
"ethereum",
"0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640" # Uniswap V3 USDC/WETH
)
if pair:
print(f"Pair Details:")
print(f" Price: ${pair.price_usd:.6f}")
print(f" 24h Volume: ${pair.volume.h24:,.0f}")
print(f" 24h Price Change: {pair.price_change.h24:+.2f}%")
print(f" DEX: {pair.dex_id}")
Real-time Updates
Basic Subscription
import asyncio
from datetime import datetime
async def price_update_handler(pair):
"""Handle price updates - showing timestamp and key info"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {pair.base_token.symbol}: ${pair.price_usd:,.4f} "
f"(24h: {pair.price_change.h24:+.2f}%)")
async def main():
client = DexscreenerClient()
print("Starting to monitor JUP token price...")
# Subscribe to JUP pair on Solana
await client.subscribe_pairs(
chain_id="solana",
pair_addresses=["JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN"],
callback=price_update_handler,
interval=1.0 # Check for updates every second
)
# Monitor for 30 seconds
print("Monitoring... (30 seconds)")
await asyncio.sleep(30)
# Clean up
print("Closing connections...")
await client.close_streams()
print("Monitoring ended")
# Run the async function
if __name__ == "__main__":
asyncio.run(main())
Monitor Multiple Pairs
async def portfolio_monitor():
client = DexscreenerClient()
# Define your portfolio (chain_id, pair_address, description)
portfolio = [
("ethereum", "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640", "USDC/WETH Uniswap V3"),
("solana", "JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN", "JUP/SOL Raydium"),
("bsc", "0x2170ed0880ac9a755fd29b2688956bd959f933f8", "ETH/BNB PancakeSwap"),
]
# Portfolio update handler
async def handle_portfolio_update(pair):
timestamp = datetime.now().strftime("%H:%M:%S")
# Determine which token is the main token (non-stablecoin)
main_token = pair.base_token if pair.base_token.symbol not in ["USDC", "USDT", "BUSD"] else pair.quote_token
print(f"[{timestamp}] [{pair.chain_id.upper()}] {main_token.symbol}: "
f"${pair.price_usd:,.4f} ({pair.price_change.h24:+.2f}%) "
f"Vol: ${pair.volume.h24:,.0f}")
print("Starting portfolio monitoring...")
# Subscribe to all pairs
for chain_id, pair_address, description in portfolio:
print(f"Subscribing to {description} on {chain_id}")
await client.subscribe_pairs(
chain_id=chain_id,
pair_addresses=[pair_address],
callback=handle_portfolio_update,
interval=2.0 # Check every 2 seconds
)
print(f"\nMonitoring {len(portfolio)} pairs, press Ctrl+C to stop...")
try:
# Monitor for 1 minute (would be longer in production)
await asyncio.sleep(60)
except KeyboardInterrupt:
print("\nInterrupt received, stopping...")
finally:
await client.close_streams()
print("Portfolio monitoring stopped")
if __name__ == "__main__":
asyncio.run(portfolio_monitor())
Filtering Updates
Control when your callback is triggered using filters to reduce unnecessary notifications:
from dexscreen import FilterPresets
# 1. Only trigger on significant price changes (1% threshold)
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=["0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"],
callback=handle_update,
filter=FilterPresets.significant_price_changes(0.01), # 1% price change
interval=0.5 # Check every 0.5 seconds
)
# 2. Limit update frequency to avoid overload
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=["0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"],
callback=handle_update,
filter=FilterPresets.rate_limited(1.0), # Max 1 update per second
interval=0.2 # Poll quickly but limit callback frequency
)
# 3. UI-friendly filtering (balanced update frequency and usefulness)
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=["0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"],
callback=handle_update,
filter=FilterPresets.ui_friendly(), # Pre-configured for UI optimization
interval=1.0
)
Error Handling
Always handle errors in your callbacks to prevent subscription interruptions in production:
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def safe_callback(pair):
try:
# Your business logic
if pair.price_usd and pair.price_usd > 100:
print(f"High value token: {pair.base_token.symbol} = ${pair.price_usd:,.2f}")
# Check for anomalous price changes
if abs(pair.price_change.h24) > 50: # 50% change in 24h
logger.warning(f"Anomalous price change: {pair.base_token.symbol} {pair.price_change.h24:+.2f}%")
except AttributeError as e:
logger.error(f"Missing data field: {e}")
except TypeError as e:
logger.error(f"Data type error: {e}")
except Exception as e:
logger.error(f"Unknown error processing update: {e}")
# Important: Don't re-raise to avoid breaking the subscription
# Use the safe callback
async def robust_monitoring():
client = DexscreenerClient()
try:
await client.subscribe_pairs(
chain_id="ethereum",
pair_addresses=["0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"],
callback=safe_callback,
interval=1.0
)
await asyncio.sleep(30)
except Exception as e:
logger.error(f"Failed to set up subscription: {e}")
finally:
await client.close_streams()
logger.info("Monitoring session ended")
Async vs Sync
Dexscreen supports both patterns:
Synchronous Mode
# Simple and straightforward, ideal for scripts and one-time queries
client = DexscreenerClient()
# Synchronous search
pairs = client.search_pairs("PEPE")
print(f"Found {len(pairs)} PEPE pairs")
# Synchronous get specific pair
pair = client.get_pair_by_pair_address("ethereum", "0x88e6...")
if pair:
print(f"Current price: ${pair.price_usd}")
Asynchronous Mode
# Better for concurrent operations, real-time monitoring, and high-performance applications
async def fetch_multiple_tokens():
client = DexscreenerClient()
# Define tokens to search
tokens = ["PEPE", "SHIB", "DOGE", "FLOKI"]
print("Searching multiple tokens concurrently...")
# Run multiple queries concurrently (faster)
tasks = [client.search_pairs_async(token) for token in tokens]
results = await asyncio.gather(*tasks)
# Process results
for token, token_pairs in zip(tokens, results):
if token_pairs:
best_pair = max(token_pairs, key=lambda p: p.volume.h24 or 0)
print(f"{token}: Found {len(token_pairs)} pairs, "
f"highest volume: ${best_pair.volume.h24:,.0f}")
else:
print(f"{token}: No pairs found")
# Run the async function
if __name__ == "__main__":
asyncio.run(fetch_multiple_tokens())
Next Steps
đ Deep Dive
- Query API - Learn all available data fetching methods
- Streaming API - Master real-time data monitoring techniques
- Data Models - Understand the data structures returned by the API
- Filtering - Learn advanced filtering techniques for performance optimization
- Examples - View complete production-ready code examples
đ Project Ideas
- Price Alert Bot: Monitor token price changes and send notifications
- Arbitrage Scanner: Find arbitrage opportunities across different DEXs and chains
- Portfolio Dashboard: Track your DeFi portfolio in real-time
- New Token Discovery Tool: Automatically discover and analyze newly listed tokens
- Liquidity Monitoring System: Track large liquidity changes
FAQ & Troubleshooting
đĻ Rate Limiting
Problem: Encountering rate limit errors Solution:
- The SDK handles rate limits automatically with backoff retry
- If frequently hitting limits, consider increasing polling intervals
- Use batch methods (e.g.,
get_pairs_by_pairs_addresses
) instead of multiple individual calls
đ No Data Returned
Common causes:
- â Wrong chain_id: Use
"ethereum"
not"eth"
- â Invalid contract addresses: Ensure addresses are correctly formatted and checksummed
- â Token doesn't exist: Some tokens may not have pairs on certain chains
- â New tokens: Recently launched tokens may not be indexed yet
Solutions:
# Verify address format
from web3 import Web3
address = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
if Web3.isAddress(address):
pairs = client.get_pairs_by_token_address("ethereum", address)
else:
print("Invalid Ethereum address")
đ Subscription Not Updating
Diagnostic steps:
- Check filter configuration: Ensure filters aren't filtering out all updates
- Verify pair activity: Ensure the pair has actual trading activity
pair = client.get_pair_by_pair_address(chain_id, pair_address)
if pair and pair.volume.h24 > 0:
print("Pair is active")
else:
print("Pair may be inactive")
- Check callback errors: Ensure callback function isn't throwing exceptions
đ§ Connection Issues
Problem: Can't connect to Dexscreener API
Solutions:
# Enable browser impersonation to bypass anti-bot protection
client = DexscreenerClient(impersonate="chrome136")
# Or enable debug mode to see detailed errors
client = DexscreenerClient(debug=True)
đž Memory Usage
Problem: High memory usage after long-running sessions
Solutions:
- Periodically clean up unneeded subscriptions
- Use appropriate filters to reduce data processing
- Implement data rotation strategies
# Example periodic cleanup
import asyncio
from datetime import datetime, timedelta
class ManagedClient:
def __init__(self):
self.client = DexscreenerClient()
self.last_cleanup = datetime.now()
async def periodic_cleanup(self):
while True:
await asyncio.sleep(3600) # Check every hour
if datetime.now() - self.last_cleanup > timedelta(hours=6):
print("Performing periodic cleanup...")
await self.client.close_streams()
self.client = DexscreenerClient() # Create fresh instance
self.last_cleanup = datetime.now()
đ Getting Help
Resource Type | Link | Use Case |
---|---|---|
đ Complete Examples | Examples Page | Need working code references |
đ API Reference | Query API | Learn specific method usage |
đ Bug Reports | GitHub Issues | Found a bug or issue |
đĄ Feature Requests | GitHub Discussions | Suggest new features |
đŦ Community Discussion | GitHub Discussions | General questions and discussion |
đ Best Practices When Seeking Help
- Provide complete error messages and relevant code snippets
- Explain your use case and expected behavior
- Include system information (Python version, OS, etc.)
- Check existing issues first to see if there's already a solution
đĄ Tip: Most issues can be resolved by checking the complete code in Examples!
đ Congratulations! You now have the fundamentals of Dexscreen. Continue exploring Examples to learn more advanced usage, or start building your first DeFi monitoring application!