跳转至

代码示例与使用场景

有效使用 Jupiter Python SDK 的真实示例和模式。

📚 目录

🚀 基础示例

简单代币交换

最基本的用例 - 将一种代币交换为另一种:

import asyncio
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient
from pyjupiter.models.ultra_api.ultra_order_request_model import UltraOrderRequest

async def simple_swap():
    """交换 0.1 WSOL 为 USDC"""
    client = AsyncUltraApiClient()

    try:
        # 获取钱包地址
        wallet = await client.get_public_key()
        print(f"🔍 使用钱包: {wallet}")

        # 创建交换订单
        order = UltraOrderRequest(
            input_mint="So11111111111111111111111111111111111111112",  # WSOL
            output_mint="EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",  # USDC
            amount=100_000_000,  # 0.1 WSOL
            taker=wallet
        )

        # 执行交换
        result = await client.order_and_execute(order)

        if result.get("status") == "Success":
            print(f"✅ 交换成功!")
            print(f"📋 交易链接: https://solscan.io/tx/{result['signature']}")
        else:
            print(f"❌ 交换失败: {result.get('error')}")

    finally:
        await client.close()

asyncio.run(simple_swap())

检查多个代币余额

import asyncio
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

async def check_portfolio():
    """检查多个代币的余额"""
    client = AsyncUltraApiClient()

    try:
        wallet = await client.get_public_key()
        balances = await client.balances(wallet)

        print(f"💰 投资组合 {wallet[:8]}...")
        print("=" * 50)

        total_value = 0
        for token, details in balances.items():
            amount = details.get('uiAmount', 0)
            frozen = details.get('isFrozen', False)

            status_icon = "🧊" if frozen else "✅"
            print(f"{status_icon} {token:<8} {amount:>15.6f}")

        print("=" * 50)

    finally:
        await client.close()

asyncio.run(check_portfolio())

代币安全检查

import asyncio
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

async def safety_check():
    """检查热门代币的安全性"""
    client = AsyncUltraApiClient()

    # 热门 Solana 代币
    popular_tokens = [
        ("WSOL", "So11111111111111111111111111111111111111112"),
        ("USDC", "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"),
        ("USDT", "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB"),
        ("BONK", "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263"),
    ]

    try:
        mints = [mint for _, mint in popular_tokens]
        shield_result = await client.shield(mints)

        print("🛡️ 代币安全报告")
        print("=" * 40)

        for name, mint in popular_tokens:
            warnings = shield_result.get("warnings", {}).get(mint, [])

            if warnings:
                print(f"⚠️  {name:<8} - {len(warnings)} 个警告")
                for warning in warnings:
                    print(f"   └─ {warning.get('type')}: {warning.get('message')}")
            else:
                print(f"✅ {name:<8} - 安全")

    finally:
        await client.close()

asyncio.run(safety_check())

📈 交易策略

定投策略 (DCA)

import asyncio
from datetime import datetime
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient
from pyjupiter.models.ultra_api.ultra_order_request_model import UltraOrderRequest

class DCABot:
    def __init__(self, api_key=None):
        self.client = AsyncUltraApiClient(api_key=api_key)

    async def dca_buy(self, input_mint, output_mint, amount_sol, frequency_hours=24):
        """执行定投策略"""
        try:
            wallet = await self.client.get_public_key()

            # 检查当前余额
            balances = await self.client.balances(wallet)
            sol_balance = balances.get("SOL", {}).get("uiAmount", 0)

            if sol_balance < amount_sol:
                print(f"❌ SOL 余额不足: {sol_balance}")
                return

            # 创建订单
            order = UltraOrderRequest(
                input_mint=input_mint,
                output_mint=output_mint,
                amount=int(amount_sol * 10**9),  # 转换为 lamports
                taker=wallet
            )

            print(f"🔄 定投: 用 {amount_sol} SOL 购买代币...")
            result = await self.client.order_and_execute(order)

            if result.get("status") == "Success":
                print(f"✅ 定投执行成功!")
                print(f"📋 交易: {result['signature']}")

                # 记录交易
                timestamp = datetime.now().isoformat()
                print(f"📅 {timestamp}: 用 {amount_sol} SOL 购买")
            else:
                print(f"❌ 定投失败: {result.get('error')}")

        except Exception as e:
            print(f"💥 定投错误: {e}")

    async def close(self):
        await self.client.close()

# 使用方法
async def run_dca():
    bot = DCABot()
    try:
        await bot.dca_buy(
            input_mint="So11111111111111111111111111111111111111112",  # WSOL
            output_mint="DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263",  # BONK
            amount_sol=0.01  # 用 0.01 SOL 购买
        )
    finally:
        await bot.close()

asyncio.run(run_dca())

套利扫描器

import asyncio
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient
from pyjupiter.models.ultra_api.ultra_order_request_model import UltraOrderRequest

class ArbitrageScanner:
    def __init__(self):
        self.client = AsyncUltraApiClient()

    async def scan_arbitrage(self, token_pairs, min_profit_bps=50):
        """扫描套利机会"""
        opportunities = []

        for input_mint, output_mint in token_pairs:
            try:
                # 获取 1 SOL 的报价
                test_amount = 1_000_000_000  # 1 SOL

                # 正向交易: input -> output
                forward_order = UltraOrderRequest(
                    input_mint=input_mint,
                    output_mint=output_mint,
                    amount=test_amount,
                    taker=await self.client.get_public_key()
                )

                forward_quote = await self.client.order(forward_order)

                if forward_quote.get("status") == "Success":
                    # 计算潜在利润
                    # 这里简化了 - 实际情况下需要解析交易以获得确切的输出金额
                    print(f"📊 分析 {input_mint[:8]}... -> {output_mint[:8]}...")

                    # 您需要在这里实现利润计算逻辑
                    # opportunities.append({...})

            except Exception as e:
                print(f"❌ 扫描 {input_mint[:8]}... 时出错: {e}")

        return opportunities

    async def close(self):
        await self.client.close()

# 使用方法
async def run_arbitrage_scan():
    scanner = ArbitrageScanner()
    try:
        pairs = [
            ("So11111111111111111111111111111111111111112", "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"),  # WSOL/USDC
            ("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB"),  # USDC/USDT
        ]

        opportunities = await scanner.scan_arbitrage(pairs)
        print(f"发现 {len(opportunities)} 个套利机会")

    finally:
        await scanner.close()

asyncio.run(run_arbitrage_scan())

💼 投资组合管理

投资组合再平衡器

import asyncio
from typing import Dict
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient
from pyjupiter.models.ultra_api.ultra_order_request_model import UltraOrderRequest

class PortfolioRebalancer:
    def __init__(self):
        self.client = AsyncUltraApiClient()

    async def rebalance_portfolio(self, target_allocations: Dict[str, float]):
        """
        重新平衡投资组合到目标配置

        参数:
            target_allocations: {"SOL": 0.5, "USDC": 0.3, "BONK": 0.2}
        """
        try:
            wallet = await self.client.get_public_key()
            balances = await self.client.balances(wallet)

            print("🔄 开始投资组合重新平衡...")
            print("=" * 50)

            # 计算当前配置
            total_value = 0  # 您需要获取美元价值
            current_allocations = {}

            for token, details in balances.items():
                amount = details.get('uiAmount', 0)
                # 简化 - 您需要真实的价格数据
                current_allocations[token] = amount

            print("📊 当前与目标配置:")
            for token, target in target_allocations.items():
                current = current_allocations.get(token, 0)
                print(f"{token:<8} 当前: {current:>8.2f}% 目标: {target*100:>6.1f}%")

            # 执行重新平衡交易
            # 这里您需要实现实际的重新平衡逻辑
            print("✅ 重新平衡完成!")

        except Exception as e:
            print(f"❌ 重新平衡失败: {e}")

    async def close(self):
        await self.client.close()

# 使用方法
async def rebalance():
    rebalancer = PortfolioRebalancer()
    try:
        await rebalancer.rebalance_portfolio({
            "SOL": 0.4,
            "USDC": 0.4,
            "BONK": 0.2
        })
    finally:
        await rebalancer.close()

asyncio.run(rebalance())

多钱包投资组合跟踪器

import asyncio
from typing import List
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

class MultiWalletTracker:
    def __init__(self):
        self.client = AsyncUltraApiClient()

    async def track_wallets(self, wallet_addresses: List[str]):
        """并发跟踪多个钱包"""
        print("👥 多钱包投资组合跟踪器")
        print("=" * 60)

        # 为并发余额获取创建任务
        tasks = [
            self.get_wallet_summary(address)
            for address in wallet_addresses
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 汇总结果
        total_wallets = len(wallet_addresses)
        successful_wallets = sum(1 for r in results if not isinstance(r, Exception))

        print(f"\n📊 摘要: {successful_wallets}/{total_wallets} 个钱包已处理")

    async def get_wallet_summary(self, address: str):
        """获取单个钱包的摘要"""
        try:
            balances = await self.client.balances(address)

            print(f"\n💰 钱包: {address[:8]}...{address[-8:]}")
            print("-" * 40)

            token_count = len(balances)
            total_tokens = sum(
                details.get('uiAmount', 0)
                for details in balances.values()
            )

            print(f"📈 代币数量: {token_count}")

            # 显示主要持仓
            sorted_balances = sorted(
                balances.items(),
                key=lambda x: x[1].get('uiAmount', 0),
                reverse=True
            )

            for token, details in sorted_balances[:5]:  # 前 5 位
                amount = details.get('uiAmount', 0)
                if amount > 0:
                    print(f"   {token:<8} {amount:>12.6f}")

            return {
                "address": address,
                "token_count": token_count,
                "balances": balances
            }

        except Exception as e:
            print(f"❌ 获取 {address[:8]}... 时出错: {e}")
            return e

    async def close(self):
        await self.client.close()

# 使用方法
async def track_multiple_wallets():
    tracker = MultiWalletTracker()
    try:
        # 在这里添加您的钱包地址
        wallets = [
            "YourWalletAddress1...",
            "YourWalletAddress2...",
            # 添加更多钱包地址
        ]

        await tracker.track_wallets(wallets)

    finally:
        await tracker.close()

# asyncio.run(track_multiple_wallets())

🔍 代币分析

代币风险评估

import asyncio
from typing import Dict, List
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

class TokenAnalyzer:
    def __init__(self):
        self.client = AsyncUltraApiClient()

    async def analyze_token_list(self, token_list: List[Dict[str, str]]):
        """分析代币列表的风险"""
        print("🔍 代币风险分析")
        print("=" * 50)

        # 提取铸造地址
        mints = [token["mint"] for token in token_list]

        # 批量安全检查
        shield_result = await self.client.shield(mints)
        warnings_dict = shield_result.get("warnings", {})

        # 分析每个代币
        for token in token_list:
            name = token["name"]
            mint = token["mint"]
            warnings = warnings_dict.get(mint, [])

            print(f"\n🪙 {name} ({mint[:8]}...)")
            print("-" * 30)

            if not warnings:
                print("✅ 未检测到安全警告")
                risk_score = "低"
            else:
                risk_score = self.calculate_risk_score(warnings)
                print(f"⚠️  检测到 {len(warnings)} 个警告:")

                for warning in warnings:
                    warning_type = warning.get("type", "未知")
                    message = warning.get("message", "无消息")
                    print(f"   • {warning_type}: {message}")

            print(f"🎯 风险评分: {risk_score}")

    def calculate_risk_score(self, warnings: List[Dict]) -> str:
        """根据警告计算风险评分"""
        if not warnings:
            return "低"

        high_risk_types = ["rugpull", "scam", "suspicious"]
        medium_risk_types = ["liquidity", "volume"]

        for warning in warnings:
            warning_type = warning.get("type", "").lower()
            if any(risk in warning_type for risk in high_risk_types):
                return "高"
            elif any(risk in warning_type for risk in medium_risk_types):
                return "中"

        return "中" if len(warnings) > 2 else "低"

    async def close(self):
        await self.client.close()

# 使用方法
async def analyze_tokens():
    analyzer = TokenAnalyzer()
    try:
        tokens_to_analyze = [
            {"name": "包装 SOL", "mint": "So11111111111111111111111111111111111111112"},
            {"name": "美元硬币", "mint": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"},
            {"name": "Bonk", "mint": "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263"},
        ]

        await analyzer.analyze_token_list(tokens_to_analyze)

    finally:
        await analyzer.close()

asyncio.run(analyze_tokens())

市场扫描器

import asyncio
from typing import List, Dict
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

class MarketScanner:
    def __init__(self):
        self.client = AsyncUltraApiClient()

    async def scan_trending_tokens(self, base_mints: List[str]):
        """根据交易量扫描热门代币"""
        print("📈 市场扫描器 - 热门代币")
        print("=" * 50)

        trending_data = []

        for mint in base_mints:
            try:
                # 首先检查安全性
                shield_result = await self.client.shield([mint])
                warnings = shield_result.get("warnings", {}).get(mint, [])

                safety_status = "🔴 有风险" if warnings else "🟢 安全"

                print(f"\n🔍 分析 {mint[:8]}...")
                print(f"   安全性: {safety_status}")

                if warnings:
                    print(f"   警告: {len(warnings)} 个")
                    for warning in warnings[:2]:  # 显示前 2 个警告
                        print(f"     • {warning.get('type')}")

                trending_data.append({
                    "mint": mint,
                    "safety_status": safety_status,
                    "warning_count": len(warnings)
                })

            except Exception as e:
                print(f"❌ 分析 {mint[:8]}... 时出错: {e}")

        # 按安全性排序(安全代币优先)
        trending_data.sort(key=lambda x: x["warning_count"])

        print(f"\n📊 扫描结果 ({len(trending_data)} 个代币):")
        print("-" * 40)

        for data in trending_data:
            mint = data["mint"]
            status = data["safety_status"]
            warnings = data["warning_count"]
            print(f"{status} {mint[:8]}... ({warnings} 个警告)")

    async def close(self):
        await self.client.close()

# 使用方法
async def scan_market():
    scanner = MarketScanner()
    try:
        # 要扫描的热门代币铸造地址
        tokens = [
            "So11111111111111111111111111111111111111112",  # WSOL
            "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",  # USDC
            "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB",  # USDT
            "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263",  # BONK
        ]

        await scanner.scan_trending_tokens(tokens)

    finally:
        await scanner.close()

asyncio.run(scan_market())

⚡ 高级模式

连接池管理器

import asyncio
from typing import List
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

class ConnectionPoolManager:
    def __init__(self, pool_size: int = 5):
        self.pool_size = pool_size
        self.clients: List[AsyncUltraApiClient] = []
        self.current_index = 0

    async def initialize(self):
        """初始化连接池"""
        print(f"🔗 初始化连接池 (大小: {self.pool_size})")

        for i in range(self.pool_size):
            client = AsyncUltraApiClient()
            self.clients.append(client)

    def get_client(self) -> AsyncUltraApiClient:
        """从池中获取下一个客户端(轮询)"""
        client = self.clients[self.current_index]
        self.current_index = (self.current_index + 1) % self.pool_size
        return client

    async def batch_operation(self, addresses: List[str]):
        """使用连接池执行批量操作"""
        semaphore = asyncio.Semaphore(self.pool_size)

        async def process_address(address):
            async with semaphore:
                client = self.get_client()
                try:
                    return await client.balances(address)
                except Exception as e:
                    return {"error": str(e)}

        tasks = [process_address(addr) for addr in addresses]
        results = await asyncio.gather(*tasks)
        return results

    async def close_all(self):
        """关闭池中的所有连接"""
        print("🔌 关闭连接池...")
        for client in self.clients:
            await client.close()

# 使用方法
async def use_connection_pool():
    pool = ConnectionPoolManager(pool_size=3)
    try:
        await pool.initialize()

        # 模拟批量处理
        addresses = [
            "11111111111111111111111111111111",  # 示例地址
            "22222222222222222222222222222222",
            "33333333333333333333333333333333",
        ]

        results = await pool.batch_operation(addresses)
        print(f"✅ 处理了 {len(results)} 个地址")

    finally:
        await pool.close_all()

# asyncio.run(use_connection_pool())

限速交易机器人

import asyncio
from datetime import datetime, timedelta
from collections import deque
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

class RateLimitedBot:
    def __init__(self, max_requests_per_minute: int = 60):
        self.client = AsyncUltraApiClient()
        self.max_requests = max_requests_per_minute
        self.request_times = deque()

    async def rate_limited_request(self, operation):
        """执行带限速的操作"""
        now = datetime.now()

        # 移除旧请求(超过 1 分钟的)
        while self.request_times and self.request_times[0] < now - timedelta(minutes=1):
            self.request_times.popleft()

        # 检查是否达到限制
        if len(self.request_times) >= self.max_requests:
            sleep_time = 60 - (now - self.request_times[0]).total_seconds()
            if sleep_time > 0:
                print(f"⏳ 已达到速率限制,等待 {sleep_time:.1f}s")
                await asyncio.sleep(sleep_time)

        # 记录这次请求
        self.request_times.append(now)

        # 执行操作
        return await operation()

    async def trading_loop(self, trading_pairs: list):
        """带限速的主交易循环"""
        print("🤖 启动限速交易机器人...")

        while True:
            try:
                for pair in trading_pairs:
                    input_mint, output_mint = pair

                    # 限速余额检查
                    wallet = await self.rate_limited_request(
                        lambda: self.client.get_public_key()
                    )

                    balances = await self.rate_limited_request(
                        lambda: self.client.balances(wallet)
                    )

                    print(f"📊 检查了 {wallet[:8]}... 的余额")

                    # 在这里添加您的交易逻辑

                    # 交易对之间的等待
                    await asyncio.sleep(5)

                # 交易周期之间的等待
                await asyncio.sleep(60)

            except KeyboardInterrupt:
                print("🛑 用户停止机器人")
                break
            except Exception as e:
                print(f"❌ 交易循环错误: {e}")
                await asyncio.sleep(30)  # 重试前等待

    async def close(self):
        await self.client.close()

# 使用方法
async def run_trading_bot():
    bot = RateLimitedBot(max_requests_per_minute=30)
    try:
        pairs = [
            ("So11111111111111111111111111111111111111112", "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"),
        ]

        await bot.trading_loop(pairs)

    finally:
        await bot.close()

# asyncio.run(run_trading_bot())

🚨 错误处理模式

综合错误处理器

import asyncio
import logging
from typing import Any, Callable
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

class ErrorHandler:
    def __init__(self):
        self.client = AsyncUltraApiClient()
        self.setup_logging()

    def setup_logging(self):
        """设置错误跟踪日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('jupiter_bot.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    async def safe_execute(self, operation: Callable, *args, **kwargs) -> Any:
        """执行带综合错误处理的操作"""
        try:
            result = await operation(*args, **kwargs)

            # 检查结果是否表示失败
            if isinstance(result, dict) and result.get("status") == "Failed":
                error_code = result.get("code")
                error_message = result.get("error")

                self.logger.error(f"API 错误 [{error_code}]: {error_message}")

                # 处理特定错误代码
                if error_code == "INSUFFICIENT_BALANCE":
                    self.logger.warning("余额不足 - 停止操作")
                    return None
                elif error_code == "SLIPPAGE_EXCEEDED":
                    self.logger.info("滑点超出 - 以更高容忍度重试")
                    # 可以在这里实现重试逻辑
                    return None
                elif error_code == "RATE_LIMITED":
                    self.logger.warning("速率限制 - 退避")
                    await asyncio.sleep(60)
                    return None

            return result

        except ConnectionError as e:
            self.logger.error(f"连接错误: {e}")
            self.logger.info("30 秒后重试...")
            await asyncio.sleep(30)
            return None

        except TimeoutError as e:
            self.logger.error(f"超时错误: {e}")
            self.logger.info("操作超时 - 重试...")
            return None

        except ValueError as e:
            self.logger.error(f"配置错误: {e}")
            self.logger.critical("检查您的配置和私钥")
            raise  # 配置无效时不继续

        except Exception as e:
            self.logger.error(f"意外错误: {e}")
            self.logger.info("继续下一个操作...")
            return None

    async def monitor_operations(self):
        """监控和记录各种操作"""
        operations = [
            ("获取公钥", self.client.get_public_key),
            ("检查余额", self.client.balances, "wallet_address_here"),
            ("盾牌检查", self.client.shield, ["So11111111111111111111111111111111111111112"]),
        ]

        for name, operation, *args in operations:
            self.logger.info(f"执行: {name}")
            result = await self.safe_execute(operation, *args)

            if result is not None:
                self.logger.info(f"✅ {name} 成功完成")
            else:
                self.logger.warning(f"❌ {name} 失败或返回 None")

            await asyncio.sleep(2)  # 操作之间的小延迟

    async def close(self):
        await self.client.close()

# 使用方法
async def run_error_handling_demo():
    handler = ErrorHandler()
    try:
        await handler.monitor_operations()
    finally:
        await handler.close()

# asyncio.run(run_error_handling_demo())

重试机制

import asyncio
import random
from typing import Any, Callable

async def exponential_backoff_retry(
    operation: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True
) -> Any:
    """
    使用指数退避进行重试

    参数:
        operation: 要重试的异步函数
        max_retries: 最大重试次数
        base_delay: 基础延迟(秒)
        max_delay: 最大延迟(秒)
        jitter: 添加随机抖动防止雷群效应
    """
    last_exception = None

    for attempt in range(max_retries + 1):
        try:
            return await operation()

        except Exception as e:
            last_exception = e

            if attempt == max_retries:
                raise last_exception

            # 使用指数退避计算延迟
            delay = min(base_delay * (2 ** attempt), max_delay)

            # 添加抖动
            if jitter:
                delay *= (0.5 + random.random() * 0.5)

            print(f"❌ 第 {attempt + 1} 次尝试失败: {e}")
            print(f"⏳ {delay:.1f} 秒后重试...")

            await asyncio.sleep(delay)

    raise last_exception

# 使用示例
async def retry_example():
    from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

    client = AsyncUltraApiClient()
    try:
        # 用重试逻辑包装任何操作
        balances = await exponential_backoff_retry(
            lambda: client.balances("wallet_address_here"),
            max_retries=3
        )
        print(f"✅ 获得余额: {balances}")

    finally:
        await client.close()

# asyncio.run(retry_example())

🚀 性能优化

并发处理

import asyncio
from typing import List
from pyjupiter.clients.ultra_api_client import AsyncUltraApiClient

class PerformanceOptimizer:
    def __init__(self, max_concurrent: int = 10):
        self.client = AsyncUltraApiClient()
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def concurrent_balance_check(self, addresses: List[str]):
        """并发检查多个地址的余额"""
        async def check_single_balance(address):
            async with self.semaphore:
                try:
                    return {
                        "address": address,
                        "balances": await self.client.balances(address),
                        "status": "success"
                    }
                except Exception as e:
                    return {
                        "address": address,
                        "error": str(e),
                        "status": "error"
                    }

        print(f"🚀 并发检查 {len(addresses)} 个地址...")
        start_time = asyncio.get_event_loop().time()

        tasks = [check_single_balance(addr) for addr in addresses]
        results = await asyncio.gather(*tasks)

        end_time = asyncio.get_event_loop().time()
        duration = end_time - start_time

        successful = sum(1 for r in results if r["status"] == "success")

        print(f"✅ 在 {duration:.2f}s 内完成")
        print(f"📊 成功率: {successful}/{len(addresses)} ({successful/len(addresses)*100:.1f}%)")

        return results

    async def batch_shield_check(self, mint_batches: List[List[str]]):
        """批量检查代币安全性"""
        async def check_batch(batch):
            async with self.semaphore:
                try:
                    return await self.client.shield(batch)
                except Exception as e:
                    return {"error": str(e)}

        print(f"🛡️  处理 {len(mint_batches)} 个盾牌批次...")

        tasks = [check_batch(batch) for batch in mint_batches]
        results = await asyncio.gather(*tasks)

        return results

    async def close(self):
        await self.client.close()

# 使用方法
async def performance_demo():
    optimizer = PerformanceOptimizer(max_concurrent=5)

    try:
        # 演示地址(替换为真实地址)
        addresses = [
            "11111111111111111111111111111111",
            "22222222222222222222222222222222",
            "33333333333333333333333333333333",
            # 添加更多地址
        ]

        # 并发余额检查
        balance_results = await optimizer.concurrent_balance_check(addresses)

        # 批量盾牌检查
        mint_batches = [
            ["So11111111111111111111111111111111111111112", "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"],
            ["Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB", "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263"],
        ]

        shield_results = await optimizer.batch_shield_check(mint_batches)

        print("🎯 性能优化演示完成!")

    finally:
        await optimizer.close()

# asyncio.run(performance_demo())

这些示例展示了 Jupiter Python SDK 的真实使用模式和最佳实践。每个示例都是完整的,可以根据您的具体用例进行调整。请记住:

  1. 始终优雅地处理错误
  2. 使用适当的速率限制
  3. 使用完毕后关闭客户端
  4. 为生产环境实施日志记录
  5. 先用小金额测试

有关更详细的 API 文档,请参阅 API 参考文档