# tax-invoice-mcp
**Repository Path**: fapiaoapi/tax-invoice-mcp
## Basic Information
- **Project Name**: tax-invoice-mcp
- **Description**: 电子发票MCP 数电发票MCP
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: https://fa-piao.com
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-05-16
- **Last Updated**: 2026-05-16
## Categories & Tags
**Categories**: Uncategorized
**Tags**: MCP, 电子发票, 数电发票, 电子发票MCP, 数电发票MCP
## README
# fa-piao.com MCP Server
[](https://modelcontextprotocol.io)
[](https://fa-piao.com)
**让 AI 大模型成为您的开票助手** — 通过 MCP (Model Context Protocol) 协议,将电子发票/数电发票 API 接入 Claude、GPT、Cursor、Cherry Studio 等大模型平台,实现 AI 智能开票。
---
## 目录
- [什么是 MCP Server?](#什么是-mcp-server)
- [核心特性](#核心特性)
- [快速开始](#快速开始)
- [MCP Server 配置](#mcp-server-配置)
- [多语言接入示例](#多语言接入示例)
- [Python](#python)
- [Node.js](#nodejs)
- [Java](#java)
- [Go](#go)
- [C#](#csharp)
- [PHP](#php)
- [Rust](#rust)
- [平台接入教程](#平台接入教程)
- [Cherry Studio](#cherry-studio)
- [Cursor](#cursor)
- [TRAE](#trae)
- [典型应用场景](#典型应用场景)
- [常见问题](#常见问题)
- [相关链接](#相关链接)
---
## 什么是 MCP Server?
MCP(Model Context Protocol)是由 Anthropic 推出的开放标准协议,定义了 AI 大模型如何安全、标准化地调用外部工具和 API。
fa-piao.com 率先实现了 **MCP Server (Streamable HTTP)**,让任何支持 MCP 的大模型都能直接操作发票系统:
- **Streamable HTTP** 传输,实时流式响应
- 完整的 **Tools 定义**:开票 / 查询 / 红冲 / 查验
- **自然语言交互**,无需编写代码
- 企业级 **API Key 权限隔离**
- 支持所有 MCP 兼容的大模型客户端
---
## 核心特性
| 特性 | 说明 |
|------|------|
| 🔌 Streamable HTTP | 基于 HTTP 的流式传输协议,实时推送响应 |
| 🛠️ 完整 Tools 定义 | 开票、查询、红冲、查验等全部发票操作 |
| 🔐 安全认证 | Bearer Token 认证 + TLS 1.3 加密传输 |
| 🌍 多语言支持 | Python / Node.js / Java / Go / C# / PHP / Rust |
| 🤖 大模型兼容 | Claude / GPT / Cursor / Cherry Studio / TRAE |
| 📊 调试模式 | 内置 DEBUG 开关,完整请求/响应日志 |
---
## 快速开始
### 1. 获取 MCP Token
访问 [open.fa-piao.com](https://open.fa-piao.com) 注册并登录,申请 MCP Token。
### 2. 配置 MCP Server
在大模型客户端的配置文件中添加以下 JSON:
```json
{
"mcpServers": {
"tax-invoice-mcp": {
"type": "streamableHttp",
"url": "https://mcp.fa-piao.com",
"headers": {
"Authorization": "Bearer YOUR_MCP_TOKEN",
"X-Client-Version": "1.0.1"
}
}
}
}
```
> ⚠️ **重要**:请将 `YOUR_MCP_TOKEN` 替换为实际的 MCP Token。
### 3. 开始使用
配置完成后,即可通过自然语言与大模型交互,例如:
- "查询纳税人 91500112MAXXXXX 的开票状态"
- "查验这张发票的真伪"
---
## MCP Server 配置
| 配置项 | 值 | 说明 |
|--------|-----|------|
| 协议类型 | `streamableHttp` | Streamable HTTP 传输协议 |
| 端点地址 | `https://mcp.fa-piao.com` | MCP Server 服务地址 |
| 认证方式 | `Bearer Token` | 在 Header 中携带 Authorization |
| 客户端版本 | `1.0.1` | X-Client-Version 请求头 |
### 请求头说明
| Header | 必填 | 说明 |
|--------|------|------|
| `Authorization` | ✅ | Bearer Token 认证 |
| `Content-Type` | ✅ | `application/json` |
| `X-Client-Version` | ✅ | 客户端版本号 |
| `Accept` | 流式必填 | `text/event-stream, application/json` |
| `mcp-session-id` | 自动 | 服务端返回的会话 ID |
---
## 多语言接入示例
所有示例均基于 **JSON-RPC 2.0** 协议,支持流式和非流式两种调用模式。
### Python
流式调用
```python
import requests
import json
import time
from datetime import datetime
MCP_URL = "https://mcp.fa-piao.com"
MCP_TOKEN = "YOUR_MCP_TOKEN"
DEBUG = True
mcp_session_id = None
request_id_counter = 0
def get_next_request_id():
global request_id_counter
request_id_counter += 1
return request_id_counter
def mcp_call_stream(method, params=None):
global mcp_session_id
start_time = time.time()
current_id = get_next_request_id()
payload = {
"jsonrpc": "2.0",
"id": current_id,
"method": method,
"params": params if params is not None else {}
}
body = json.dumps(payload, ensure_ascii=False)
headers = {
"Authorization": f"Bearer {MCP_TOKEN}",
"Content-Type": "application/json",
"X-Client-Version": "1.0.1",
"Accept": "text/event-stream, application/json"
}
if mcp_session_id:
headers["mcp-session-id"] = mcp_session_id
try:
resp = requests.post(MCP_URL, data=body, headers=headers, timeout=120)
duration_ms = (time.time() - start_time) * 1000
response_session_id = resp.headers.get("mcp-session-id")
if response_session_id:
mcp_session_id = response_session_id
content_type = resp.headers.get("Content-Type", "")
is_sse = "text/event-stream" in content_type
if is_sse:
response_data = []
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if line.startswith("data:"):
data = line[5:].strip()
if data == "[DONE]":
break
response_data.append(data)
return "\n".join(response_data)
else:
return resp.text.strip()
except Exception as e:
print(f"[异常] {method}: {e}")
return None
if __name__ == "__main__":
mcp_call_stream("initialize", {"protocolVersion": "2025-11-25"})
mcp_call_stream("notifications/initialized", {})
mcp_call_stream("tools/list", {})
mcp_call_stream("tools/call", {
"name": "enterprise_query_state",
"arguments": {"nsrsbh": "91500112MAXXXXX", "username": "1325580xxxx"}
})
```
非流式调用
```python
import requests
import json
MCP_URL = "https://mcp.fa-piao.com"
MCP_TOKEN = "YOUR_MCP_TOKEN"
mcp_session_id = None
request_id_counter = 0
def get_next_request_id():
global request_id_counter
request_id_counter += 1
return request_id_counter
def mcp_call(method, params=None):
global mcp_session_id
current_id = get_next_request_id()
payload = {
"jsonrpc": "2.0",
"id": current_id,
"method": method,
"params": params if params is not None else {}
}
headers = {
"Authorization": f"Bearer {MCP_TOKEN}",
"X-Client-Version": "1.0.1",
"Content-Type": "application/json"
}
if mcp_session_id:
headers["mcp-session-id"] = mcp_session_id
resp = requests.post(MCP_URL, data=json.dumps(payload, ensure_ascii=False),
headers=headers, timeout=120)
response_session_id = resp.headers.get("mcp-session-id")
if response_session_id:
mcp_session_id = response_session_id
return resp.text.strip()
if __name__ == "__main__":
mcp_call("initialize", {"protocolVersion": "2025-11-25"})
result = mcp_call("tools/call", {
"name": "enterprise_query_state",
"arguments": {"nsrsbh": "91500112MAXXXX", "username": "1325580xxxx"}
})
print(json.dumps(result, indent=2, ensure_ascii=False))
```
流式调用
```typescript
import axios, { AxiosResponse } from 'axios';
const MCP_URL = 'https://mcp.fa-piao.com';
const MCP_TOKEN = 'YOUR_MCP_TOKEN';
const DEBUG = true;
let mcpSessionId: string | null = null;
let requestIdCounter = 0;
function getNextRequestId(): number {
requestIdCounter += 1;
return requestIdCounter;
}
function parseSseResponse(responseText: string): string {
const lines = responseText.split(/\r?\n/);
const responseData: string[] = [];
for (const line of lines) {
if (!line.trim()) continue;
if (line.startsWith('data:')) {
const data = line.substring(5).trim();
if (data === '[DONE]') break;
responseData.push(data);
}
}
return responseData.join('\n');
}
export async function mcpCallStream(method: string, params?: any): Promise非流式调用
```typescript
import axios, { AxiosResponse } from 'axios';
const MCP_URL = 'https://mcp.fa-piao.com';
const MCP_TOKEN = 'YOUR_MCP_TOKEN';
let mcpSessionId: string | null = null;
let requestId = 0;
function nextRequestId(): number {
return ++requestId;
}
export async function mcpCall(method: string, params?: string): Promise流式调用
```java
import java.net.URI;
import java.net.http.*;
import java.io.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class McpStreamClient {
public static final boolean DEBUG = false;
private static final String MCP_URL = "https://mcp.fa-piao.com";
private static final String MCP_TOKEN = "YOUR_MCP_TOKEN";
private static final AtomicReference非流式调用
```java
import java.net.URI;
import java.net.http.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class McpClient {
private static final String MCP_URL = "https://mcp.fa-piao.com";
private static final String MCP_TOKEN = "YOUR_MCP_TOKEN";
private static final AtomicReference流式调用
```go
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"net/http"
"strings"
"sync/atomic"
)
const (
MCP_URL = "https://mcp.fa-piao.com"
MCP_TOKEN = "YOUR_MCP_TOKEN"
DEBUG = true
)
var (
mcpSessionId atomic.Value
requestId atomic.Int32
)
func init() {
mcpSessionId.Store("")
}
func nextRequestId() int32 {
return requestId.Add(1)
}
func mcpCallStream(method string, params string) (string, error) {
currentId := nextRequestId()
body := fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"%s","params":%s}`,
currentId, method, params)
req, err := http.NewRequest("POST", MCP_URL, bytes.NewBufferString(body))
if err != nil {
return "", fmt.Errorf("创建请求失败: %v", err)
}
req.Header.Set("Authorization", "Bearer "+MCP_TOKEN)
req.Header.Set("X-Client-Version", "1.0.1")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "text/event-stream, application/json")
sessionId := mcpSessionId.Load().(string)
if sessionId != "" {
req.Header.Set("mcp-session-id", sessionId)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("发送请求失败: %v", err)
}
defer resp.Body.Close()
responseSessionId := resp.Header.Get("mcp-session-id")
if responseSessionId != "" {
mcpSessionId.Store(responseSessionId)
}
return readStreamResponse(resp.Body, resp.Header.Get("Content-Type"))
}
func readStreamResponse(body io.Reader, contentType string) (string, error) {
reader := bufio.NewReader(body)
var responseBody strings.Builder
isSSE := strings.Contains(contentType, "text/event-stream")
if isSSE {
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
return "", err
}
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "data:") {
data := strings.TrimSpace(line[5:])
if data == "[DONE]" {
break
}
responseBody.WriteString(data)
}
}
} else {
all, err := io.ReadAll(reader)
if err != nil {
return "", err
}
responseBody.Write(all)
}
return strings.TrimSpace(responseBody.String()), nil
}
func main() {
mcpCallStream("initialize", `{"protocolVersion": "2025-11-25"}`)
mcpCallStream("notifications/initialized", "{}")
mcpCallStream("tools/list", "{}")
mcpCallStream("tools/call",
`{"name":"enterprise_query_state","arguments":{"nsrsbh":"91500112MAXXXXX","username":"1325580xxxx"}}`)
}
```
非流式调用
```go
package main
import (
"bytes"
"fmt"
"io"
"net/http"
"sync/atomic"
)
const (
MCP_URL = "https://mcp.fa-piao.com"
MCP_TOKEN = "YOUR_MCP_TOKEN"
)
var (
mcpSessionId atomic.Value
requestId atomic.Int32
)
func init() {
mcpSessionId.Store("")
}
func nextRequestId() int32 {
return requestId.Add(1)
}
func mcpCall(method string, params string) (string, error) {
currentId := nextRequestId()
body := fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"%s","params":%s}`,
currentId, method, params)
req, err := http.NewRequest("POST", MCP_URL, bytes.NewBufferString(body))
if err != nil {
return "", fmt.Errorf("创建请求失败: %v", err)
}
req.Header.Set("Authorization", "Bearer "+MCP_TOKEN)
req.Header.Set("X-Client-Version", "1.0.1")
req.Header.Set("Content-Type", "application/json")
sessionId := mcpSessionId.Load().(string)
if sessionId != "" {
req.Header.Set("mcp-session-id", sessionId)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("发送请求失败: %v", err)
}
defer resp.Body.Close()
responseSessionId := resp.Header.Get("mcp-session-id")
if responseSessionId != "" {
mcpSessionId.Store(responseSessionId)
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("读取响应失败: %v", err)
}
return string(respBody), nil
}
func main() {
mcpCall("initialize", `{"protocolVersion": "2025-11-25"}`)
mcpCall("tools/call",
`{"name":"enterprise_query_state","arguments":{"nsrsbh":"91500112MAXXXX","username":"1325580xxxx"}}`)
}
```
流式调用
```csharp
using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
namespace Tax.Invoice.Example
{
public class McpStreamClient
{
public static readonly bool DEBUG = true;
private const string MCP_URL = "https://mcp.fa-piao.com";
private const string MCP_TOKEN = "YOUR_MCP_TOKEN";
private static string? mcpSessionId = null;
private static int requestId = 0;
private static readonly object lockObj = new object();
private static int NextRequestId()
{
lock (lockObj) { return ++requestId; }
}
public static async Task非流式调用
```csharp
using System;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
namespace Tax.Invoice.Example
{
public class McpClient
{
private const string MCP_URL = "https://mcp.fa-piao.com";
private const string MCP_TOKEN = "YOUR_MCP_TOKEN";
private static string? mcpSessionId = null;
private static int requestId = 0;
private static readonly object lockObj = new object();
private static int NextRequestId()
{
lock (lockObj) { return ++requestId; }
}
public static async Task流式调用
```php
'2.0',
'id' => $currentId,
'method' => $method,
'params' => json_decode($paramsJson, true)
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
$headers = [
'Authorization: Bearer ' . self::$MCP_TOKEN,
'X-Client-Version: 1.0.1',
'Content-Type: application/json',
'Accept: text/event-stream, application/json'
];
if (self::$mcpSessionId !== null && self::$mcpSessionId !== '') {
$headers[] = 'mcp-session-id: ' . self::$mcpSessionId;
}
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => self::$MCP_URL,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $body,
CURLOPT_HTTPHEADER => $headers,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HEADER => true,
CURLOPT_SSL_VERIFYPEER => true,
CURLOPT_TIMEOUT => 30,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
if (curl_errno($ch)) {
curl_close($ch);
throw new Exception("cURL Error: " . curl_error($ch));
}
curl_close($ch);
$responseHeaders = substr($response, 0, $headerSize);
$responseBody = trim(substr($response, $headerSize));
preg_match_all('/^mcp-session-id:\s*(.+)$/im', $responseHeaders, $matches);
if (!empty($matches[1])) {
self::$mcpSessionId = trim($matches[1][0]);
}
return $responseBody;
}
public static function initialize()
{
return self::mcpCallStream("initialize", '{"protocolVersion": "2025-11-25"}');
}
public static function toolsCall($toolName, $arguments)
{
$params = json_encode(['name' => $toolName, 'arguments' => $arguments],
JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
return self::mcpCallStream("tools/call", $params);
}
}
// 使用示例
McpStreamClient::initialize();
McpStreamClient::toolsCall("enterprise_query_state", [
"nsrsbh" => "91500112MAXXXXX",
"username" => "1325580xxxx"
]);
```
非流式调用
```php
'2.0',
'id' => $currentId,
'method' => $method,
'params' => json_decode($paramsJson, true)
], JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
$headers = [
'Authorization: Bearer ' . self::$MCP_TOKEN,
'X-Client-Version: 1.0.1',
'Content-Type: application/json'
];
if (self::$mcpSessionId !== null && self::$mcpSessionId !== '') {
$headers[] = 'mcp-session-id: ' . self::$mcpSessionId;
}
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => self::$MCP_URL,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $body,
CURLOPT_HTTPHEADER => $headers,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HEADER => true,
CURLOPT_SSL_VERIFYPEER => true,
CURLOPT_TIMEOUT => 30,
]);
$response = curl_exec($ch);
$headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
if (curl_errno($ch)) {
curl_close($ch);
throw new Exception("cURL Error: " . curl_error($ch));
}
curl_close($ch);
$responseHeaders = substr($response, 0, $headerSize);
$responseBody = trim(substr($response, $headerSize));
preg_match_all('/^mcp-session-id:\s*(.+)$/im', $responseHeaders, $matches);
if (!empty($matches[1])) {
self::$mcpSessionId = trim($matches[1][0]);
}
return $responseBody;
}
public static function initialize()
{
return self::mcpCall("initialize", '{"protocolVersion": "2025-11-25"}');
}
public static function toolsCall($toolName, $arguments)
{
$params = json_encode(['name' => $toolName, 'arguments' => $arguments],
JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
return self::mcpCall("tools/call", $params);
}
}
// 使用示例
McpClient::initialize();
McpClient::toolsCall("enterprise_query_state", [
"nsrsbh" => "91500112MAXXXX",
"username" => "1325580xxxx"
]);
```
流式调用
```rust
use std::io::{self, BufRead};
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Mutex;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE};
const MCP_URL: &str = "https://mcp.fa-piao.com";
const MCP_TOKEN: &str = "YOUR_MCP_TOKEN";
const CLIENT_VERSION: &str = "1.0.1";
lazy_static::lazy_static! {
static ref MCP_SESSION_ID: Mutex非流式调用
```rust
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Mutex;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_TYPE};
const MCP_URL: &str = "https://mcp.fa-piao.com";
const MCP_TOKEN: &str = "YOUR_MCP_TOKEN";
const CLIENT_VERSION: &str = "1.0.1";
lazy_static::lazy_static! {
static ref MCP_SESSION_ID: Mutex什么是 MCP (Model Context Protocol)?
MCP 是由 Anthropic 推出的开放标准协议,用于规范 AI 大模型与外部工具/API 之间的通信。fa-piao.com 的 MCP Server 实现了 Streamable HTTP 传输方式,支持实时流式响应。
支持哪些大模型平台?
支持所有兼容 MCP 协议的大模型平台,包括 Claude Desktop、Cursor、Cherry Studio、TRAE、Windsurf 等。只要平台支持 MCP 协议,即可直接使用。
MCP Server 的安全性如何保障?
- API Key 认证,每个请求需携带有效 Token
- 权限隔离,不同 API Key 可配置不同操作权限
- TLS 1.3 加密传输
- 请求频率限制,防止滥用
- 完整的操作审计日志
使用 MCP Server 和直接调用 API 有什么区别?
MCP Server 让您通过**自然语言**与大模型交互来完成开票,无需编写代码。两者底层调用的是同一套发票 API,功能和性能完全一致。MCP Server 更适合非技术用户和追求效率的场景。
MCP Server 是否包含在所有定价方案中?
是的,MCP Server 功能**包含在所有定价方案中**,包括 1 分钱试用版。
© 2025 fa-piao.com | 数电发票 API · 电子发票接口 · AI 开票 · MCP Server