跳转至

查询 API 参考

查询 API 提供同步和异步方法从 Dexscreener 获取数据。所有方法都有同步和异步版本。

概述

查询方法是一次性数据获取,立即返回当前数据。它们适用于:

  • 获取当前价格
  • 搜索代币
  • 获取交易对信息
  • 检索代币档案

速率限制

  • 交易对查询:300 请求/分钟
  • 代币档案/订单:60 请求/分钟

SDK 自动处理速率限制和重试逻辑。

交易对查询 (Pair Queries)

get_pair / get_pair_async

def get_pair(address: str) -> Optional[TokenPair]
async def get_pair_async(address: str) -> Optional[TokenPair]

获取单个交易对信息(通过搜索实现,无需指定链)。

示例:

# 同步
pair = client.get_pair("JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN")
if pair:
    print(f"{pair.base_token.symbol}: ${pair.price_usd}")

# 异步
pair = await client.get_pair_async("JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN")

get_pair_by_pair_address / get_pair_by_pair_address_async

def get_pair_by_pair_address(chain_id: str, pair_address: str) -> Optional[TokenPair]
async def get_pair_by_pair_address_async(chain_id: str, pair_address: str) -> Optional[TokenPair]

获取指定链上的交易对信息。

参数:

  • chain_id:区块链标识符(如 "ethereum"、"solana"、"bsc")
  • pair_address:交易对合约地址

示例:

pair = client.get_pair_by_pair_address(
    "ethereum",
    "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"  # USDC/WETH
)

get_pairs_by_pairs_addresses / get_pairs_by_pairs_addresses_async

def get_pairs_by_pairs_addresses(chain_id: str, pair_addresses: List[str]) -> List[TokenPair]
async def get_pairs_by_pairs_addresses_async(chain_id: str, pair_addresses: List[str]) -> List[TokenPair]

批量获取交易对信息(同一链)。最多支持 30 个地址,超过将抛出 ValueError。

示例:

pairs = client.get_pairs_by_pairs_addresses(
    "ethereum",
    [
        "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640",
        "0x11b815efb8f581194ae79006d24e0d814b7697f6",
        "0x4e68ccd3e89f51c3074ca5072bbac773960dfa36"
    ]
)

search_pairs / search_pairs_async

def search_pairs(query: str) -> List[TokenPair]
async def search_pairs_async(query: str) -> List[TokenPair]

搜索交易对(按名称、符号或地址)。

示例:

# 按符号搜索
results = client.search_pairs("PEPE")

# 按名称搜索
results = client.search_pairs("Shiba Inu")

# 按地址搜索
results = client.search_pairs("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640")

get_pairs_by_token_address / get_pairs_by_token_address_async

def get_pairs_by_token_address(chain_id: str, token_address: str) -> List[TokenPair]
async def get_pairs_by_token_address_async(chain_id: str, token_address: str) -> List[TokenPair]

获取指定链上单个代币的所有交易对。返回该代币在该链所有 DEX 上的所有交易对。

示例:

# 获取以太坊上的所有 USDC 交易对
usdc_pairs = client.get_pairs_by_token_address(
    "ethereum",
    "A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"  # USDC
)

for pair in usdc_pairs:
    other_token = pair.quote_token if pair.base_token.address.lower() == usdc_address.lower() else pair.base_token
    print(f"USDC/{other_token.symbol}{pair.dex_id}: ${pair.price_usd}")

get_pairs_by_token_addresses / get_pairs_by_token_addresses_async

def get_pairs_by_token_addresses(chain_id: str, token_addresses: List[str]) -> List[TokenPair]
async def get_pairs_by_token_addresses_async(chain_id: str, token_addresses: List[str]) -> List[TokenPair]

批量获取指定链上多个代币的所有交易对。返回所有包含任意指定代币的交易对(去重)。

注意

  • 需要指定链 ID(如 "solana"、"ethereum" 等)
  • 最多支持 30 个代币地址,超过将抛出 ValueError
  • API 最多返回 30 个交易对(最相关/活跃的)
  • 返回的是所有包含这些代币的交易对集合,如果一个交易对包含多个指定的代币(如 USDC/SOL 对同时包含 USDC 和 SOL),它只会出现一次

示例:

# 获取多个代币的交易对
token_addresses = [
    "A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",  # USDC
    "dac17f958d2ee523a2206206994597c13d831ec7",  # USDT
    "6B175474E89094C44Da98b954EedeAC495271d0F"   # DAI
]

stablecoin_pairs = client.get_pairs_by_token_addresses("ethereum", token_addresses)

代币信息查询 (Token Information)

get_latest_token_profiles / get_latest_token_profiles_async

def get_latest_token_profiles() -> List[TokenInfo]
async def get_latest_token_profiles_async() -> List[TokenInfo]

获取最新代币档案。速率限制:60请求/分钟。

示例:

profiles = client.get_latest_token_profiles()
for token in profiles[:10]:
    print(f"{token.token_address}: {token.description}")

get_latest_boosted_tokens / get_latest_boosted_tokens_async

def get_latest_boosted_tokens() -> List[TokenInfo]
async def get_latest_boosted_tokens_async() -> List[TokenInfo]

获取最新推广代币。速率限制:60请求/分钟。

示例:

boosted = client.get_latest_boosted_tokens()
for token in boosted:
    print(f"{token.chain_id}: {token.token_address} - 推广: {token.amount}")

get_tokens_most_active / get_tokens_most_active_async

def get_tokens_most_active() -> List[TokenInfo]
async def get_tokens_most_active_async() -> List[TokenInfo]

获取最活跃推广代币。速率限制:60请求/分钟。

池信息查询 (Pool Information)

get_pools_by_token_address / get_pools_by_token_address_async

def get_pools_by_token_address(chain_id: str, token_address: str) -> List[TokenPair]
async def get_pools_by_token_address_async(chain_id: str, token_address: str) -> List[TokenPair]

使用 token-pairs/v1 端点获取池信息。与 get_pairs_by_pairs_addresses 类似,但使用不同的 API 端点。速率限制:300请求/分钟。

订单查询 (Order Queries)

get_orders_paid_of_token / get_orders_paid_of_token_async

def get_orders_paid_of_token(chain_id: str, token_address: str) -> List[OrderInfo]
async def get_orders_paid_of_token_async(chain_id: str, token_address: str) -> List[OrderInfo]

获取代币付费订单。速率限制:60请求/分钟。

示例:

orders = client.get_orders_paid_of_token("ethereum", token_address)
for order in orders:
    print(f"订单 {order.type}: {order.status}{order.payment_timestamp}")

最佳实践

1. 多查询使用异步

获取多个代币/交易对数据时,使用异步方法:

import asyncio

async def fetch_multiple_tokens():
    client = DexscreenerClient()

    tokens = ["address1", "address2", "address3"]

    # 并发获取
    tasks = [
        client.get_pairs_by_token_address_async("ethereum", addr)
        for addr in tokens
    ]

    results = await asyncio.gather(*tasks)
    return results

2. 处理速率限制

SDK 自动处理速率限制,但您也可以主动处理:

# 为代币档案请求留出间隔(60/分钟限制)
import time

for token_address in large_token_list:
    orders = client.get_orders_paid_of_token("ethereum", token_address)
    process_orders(orders)
    time.sleep(1.1)  # 安全起见约 54 请求每分钟

3. 错误处理

始终处理潜在的 None 返回:

pair = client.get_pair_by_pair_address("ethereum", pair_address)
if pair:
    # 处理交易对数据
    print(f"价格: ${pair.price_usd}")
else:
    print(f"未找到交易对 {pair_address}")

4. 批量操作

对于同一链上的多个地址,使用批量方法:

# 不要这样做:
pairs = []
for address in addresses:
    pair = client.get_pair_by_pair_address("ethereum", address)
    if pair:
        pairs.append(pair)

# 应该这样做:
pairs = client.get_pairs_by_pairs_addresses("ethereum", addresses)

常见模式

寻找套利机会

async def find_arbitrage(token_address: str):
    client = DexscreenerClient()

    # 获取多条链上的交易对
    chains = ["ethereum", "bsc", "polygon", "arbitrum"]

    tasks = [
        client.get_pairs_by_token_address_async(chain, token_address)
        for chain in chains
    ]

    all_pairs = await asyncio.gather(*tasks)

    # 扁平化并分析
    prices_by_chain = {}
    for chain, pairs in zip(chains, all_pairs):
        if pairs:
            # 获取流动性最高的交易对
            best_pair = max(pairs, key=lambda p: p.liquidity.usd or 0)
            prices_by_chain[chain] = best_pair.price_usd

    # 寻找套利机会
    if len(prices_by_chain) > 1:
        min_price = min(prices_by_chain.values())
        max_price = max(prices_by_chain.values())
        spread = ((max_price - min_price) / min_price) * 100

        if spread > 1:  # 1% 阈值
            print(f"套利机会: {spread:.2f}% 价差")
            return prices_by_chain

代币发现

def discover_new_tokens():
    client = DexscreenerClient()

    # 获取最新档案
    profiles = client.get_latest_token_profiles()

    # 过滤有趣的代币
    interesting = []
    for token in profiles:
        # 获取交易数据
        pairs = client.get_pairs_by_token_address(token.chain_id, token.token_address)

        if pairs:
            total_volume = sum(p.volume.h24 for p in pairs if p.volume.h24)
            total_liquidity = sum(p.liquidity.usd for p in pairs if p.liquidity and p.liquidity.usd)

            if total_volume > 100_000 and total_liquidity > 50_000:
                interesting.append({
                    'token': token,
                    'pairs': pairs,
                    'volume': total_volume,
                    'liquidity': total_liquidity
                })

    return interesting