IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    如何写一个MCP Server

    Elmagnifico\'s Blog发表于 2025-04-01 00:00:00
    love 0

    Foreword

    看一下如何写一个MCP服务

    FastMCP

    翻了一下看到了MCP有一个快速的模式,这种方式不需要写那么多前置代码

    # server.py
    from mcp.server.fastmcp import FastMCP
    
    # Create an MCP server
    mcp = FastMCP("Demo")
    
    
    # 通过注解快速加入一个工具命令
    # Add an addition tool
    @mcp.tool()
    def add(a: int, b: int) -> int:
        """Add two numbers"""
        return a + b
    
    # 这里还通过注解,加入了有一部分资源
    # 这里的资源可以理解为一个文件或者一段信息,或者说一个特定的内容,这个可以给LLM去获取的
    # Add a dynamic greeting resource
    @mcp.resource("greeting://{name}")
    def get_greeting(name: str) -> str:
        """Get a personalized greeting"""
        return f"Hello, {name}!"
    

    快速安装,启动服务

    mcp install server.py
    mcp dev server.py
    

    这种方式更接近于你提前告诉了LLM你有一个函数,他的输入输出模式是什么样的,调用起来稍微有点死板

    除了工具和资源注解,还有一个prompt的注解,其实就是现在ChatGPT里的各种角色扮演的prompt,只是内置到了mcp里而已

    @mcp.prompt()
    

    Git MCP Server

    https://github.com/modelcontextprotocol/servers/tree/main/src/git

    先看Git的MCP Server,主要是用python写的

    import click
    from pathlib import Path
    import logging
    import sys
    from .server import serve
    
    @click.command()
    @click.option("--repository", "-r", type=Path, help="Git repository path")
    @click.option("-v", "--verbose", count=True)
    def main(repository: Path | None, verbose: bool) -> None:
        """MCP Git Server - Git functionality for MCP"""
        import asyncio
    
        logging_level = logging.WARN
        if verbose == 1:
            logging_level = logging.INFO
        elif verbose >= 2:
            logging_level = logging.DEBUG
    
        logging.basicConfig(level=logging_level, stream=sys.stderr)
        asyncio.run(serve(repository))
    
    if __name__ == "__main__":
        main()
    
    

    这里主要是支持通过命令行调用工具,启动一个git仓库的管理服务

    主要git仓库的实现是看这里的server.py

    import logging
    from pathlib import Path
    # 接口数据类型显式定义模块
    from typing import Sequence
    # 这里主要是mcp相关的server接口定义
    from mcp.server import Server
    from mcp.server.session import ServerSession
    from mcp.server.stdio import stdio_server
    from mcp.types import (
        ClientCapabilities,
        TextContent,
        Tool,
        ListRootsResult,
        RootsCapability,
    )
    from enum import Enum
    # 主要是依赖git模块实现git操作
    import git
    # 基础的数据验证模块
    from pydantic import BaseModel
    
    
    # 这里主要是对各个命令的参数基类进行定义
    class GitStatus(BaseModel):
        repo_path: str
    
    class GitDiffUnstaged(BaseModel):
        repo_path: str
    
    class GitDiffStaged(BaseModel):
        repo_path: str
    
    class GitDiff(BaseModel):
        repo_path: str
        target: str
    
    class GitCommit(BaseModel):
        repo_path: str
        message: str
    
    class GitAdd(BaseModel):
        repo_path: str
        files: list[str]
    
    class GitReset(BaseModel):
        repo_path: str
    
    class GitLog(BaseModel):
        repo_path: str
        max_count: int = 10
    
    class GitCreateBranch(BaseModel):
        repo_path: str
        branch_name: str
        base_branch: str | None = None
    
    class GitCheckout(BaseModel):
        repo_path: str
        branch_name: str
    
    class GitShow(BaseModel):
        repo_path: str
        revision: str
    
    class GitInit(BaseModel):
        repo_path: str
    
    # 这里定义工具类本身
    class GitTools(str, Enum):
        STATUS = "git_status"
        DIFF_UNSTAGED = "git_diff_unstaged"
        DIFF_STAGED = "git_diff_staged"
        DIFF = "git_diff"
        COMMIT = "git_commit"
        ADD = "git_add"
        RESET = "git_reset"
        LOG = "git_log"
        CREATE_BRANCH = "git_create_branch"
        CHECKOUT = "git_checkout"
        SHOW = "git_show"
        INIT = "git_init"
    
    # 这里是各个函数的具体实现
    def git_status(repo: git.Repo) -> str:
        return repo.git.status()
    
    def git_diff_unstaged(repo: git.Repo) -> str:
        return repo.git.diff()
    
    def git_diff_staged(repo: git.Repo) -> str:
        return repo.git.diff("--cached")
    
    def git_diff(repo: git.Repo, target: str) -> str:
        return repo.git.diff(target)
    
    def git_commit(repo: git.Repo, message: str) -> str:
        commit = repo.index.commit(message)
        return f"Changes committed successfully with hash {commit.hexsha}"
    
    def git_add(repo: git.Repo, files: list[str]) -> str:
        repo.index.add(files)
        return "Files staged successfully"
    
    def git_reset(repo: git.Repo) -> str:
        repo.index.reset()
        return "All staged changes reset"
    
    def git_log(repo: git.Repo, max_count: int = 10) -> list[str]:
        commits = list(repo.iter_commits(max_count=max_count))
        log = []
        for commit in commits:
            log.append(
                f"Commit: {commit.hexsha}\n"
                f"Author: {commit.author}\n"
                f"Date: {commit.authored_datetime}\n"
                f"Message: {commit.message}\n"
            )
        return log
    
    def git_create_branch(repo: git.Repo, branch_name: str, base_branch: str | None = None) -> str:
        if base_branch:
            base = repo.refs[base_branch]
        else:
            base = repo.active_branch
    
        repo.create_head(branch_name, base)
        return f"Created branch '{branch_name}' from '{base.name}'"
    
    def git_checkout(repo: git.Repo, branch_name: str) -> str:
        repo.git.checkout(branch_name)
        return f"Switched to branch '{branch_name}'"
    
    def git_init(repo_path: str) -> str:
        try:
            repo = git.Repo.init(path=repo_path, mkdir=True)
            return f"Initialized empty Git repository in {repo.git_dir}"
        except Exception as e:
            return f"Error initializing repository: {str(e)}"
    
    def git_show(repo: git.Repo, revision: str) -> str:
        commit = repo.commit(revision)
        output = [
            f"Commit: {commit.hexsha}\n"
            f"Author: {commit.author}\n"
            f"Date: {commit.authored_datetime}\n"
            f"Message: {commit.message}\n"
        ]
        if commit.parents:
            parent = commit.parents[0]
            diff = parent.diff(commit, create_patch=True)
        else:
            diff = commit.diff(git.NULL_TREE, create_patch=True)
        for d in diff:
            output.append(f"\n--- {d.a_path}\n+++ {d.b_path}\n")
            output.append(d.diff.decode('utf-8'))
        return "".join(output)
    

    下面是具体的serve实现

    async def serve(repository: Path | None) -> None:
        logger = logging.getLogger(__name__)
    
        if repository is not None:
            try:
                # 对git进行初始化
                git.Repo(repository)
                logger.info(f"Using repository at {repository}")
            except git.InvalidGitRepositoryError:
                logger.error(f"{repository} is not a valid Git repository")
                return
    	# 注册服务
        server = Server("mcp-git")
    
        # 通过注解绑定对应的功能
        @server.list_tools()
        async def list_tools() -> list[Tool]:
            return [
                # AI能理解这个功能是干啥的,主要就是靠这个description了
                Tool(
                    name=GitTools.STATUS,
                    description="Shows the working tree status",
                    inputSchema=GitStatus.schema(),
                ),
                Tool(
                    name=GitTools.DIFF_UNSTAGED,
                    description="Shows changes in the working directory that are not yet staged",
                    inputSchema=GitDiffUnstaged.schema(),
                ),
                Tool(
                    name=GitTools.DIFF_STAGED,
                    description="Shows changes that are staged for commit",
                    inputSchema=GitDiffStaged.schema(),
                ),
                Tool(
                    name=GitTools.DIFF,
                    description="Shows differences between branches or commits",
                    inputSchema=GitDiff.schema(),
                ),
                Tool(
                    name=GitTools.COMMIT,
                    description="Records changes to the repository",
                    inputSchema=GitCommit.schema(),
                ),
                Tool(
                    name=GitTools.ADD,
                    description="Adds file contents to the staging area",
                    inputSchema=GitAdd.schema(),
                ),
                Tool(
                    name=GitTools.RESET,
                    description="Unstages all staged changes",
                    inputSchema=GitReset.schema(),
                ),
                Tool(
                    name=GitTools.LOG,
                    description="Shows the commit logs",
                    inputSchema=GitLog.schema(),
                ),
                Tool(
                    name=GitTools.CREATE_BRANCH,
                    description="Creates a new branch from an optional base branch",
                    inputSchema=GitCreateBranch.schema(),
                ),
                Tool(
                    name=GitTools.CHECKOUT,
                    description="Switches branches",
                    inputSchema=GitCheckout.schema(),
                ),
                Tool(
                    name=GitTools.SHOW,
                    description="Shows the contents of a commit",
                    inputSchema=GitShow.schema(),
                ),
                Tool(
                    name=GitTools.INIT,
                    description="Initialize a new Git repository",
                    inputSchema=GitInit.schema(),
                )
            ]
    	# 这里主要是获取当前目录下仓库列表,比如带有子仓库等,就会显示多个
        async def list_repos() -> Sequence[str]:
            async def by_roots() -> Sequence[str]:
                if not isinstance(server.request_context.session, ServerSession):
                    raise TypeError("server.request_context.session must be a ServerSession")
    
                if not server.request_context.session.check_client_capability(
                    ClientCapabilities(roots=RootsCapability())
                ):
                    return []
    
                roots_result: ListRootsResult = await server.request_context.session.list_roots()
                logger.debug(f"Roots result: {roots_result}")
                repo_paths = []
                for root in roots_result.roots:
                    path = root.uri.path
                    try:
                        git.Repo(path)
                        repo_paths.append(str(path))
                    except git.InvalidGitRepositoryError:
                        pass
                return repo_paths
    
            def by_commandline() -> Sequence[str]:
                return [str(repository)] if repository is not None else []
    
            cmd_repos = by_commandline()
            root_repos = await by_roots()
            return [*root_repos, *cmd_repos]
    
        # 通过call_tool的注解来调用每个接口
        @server.call_tool()
        async def call_tool(name: str, arguments: dict) -> list[TextContent]:
            repo_path = Path(arguments["repo_path"])
            # name就是具体调用的函数,arguements就是这个操作的参数字典
            
            # 如果是仓库初始化,是不需要一个具体仓库对象的,所以直接操作就行了
            # Handle git init separately since it doesn't require an existing repo
            if name == GitTools.INIT:
                result = git_init(str(repo_path))
                return [TextContent(
                    type="text",
                    text=result
                )]
            
            # 其他命令就需要一个具体的仓库对象
            # For all other commands, we need an existing repo
            repo = git.Repo(repo_path)
    		
            # 通过命令名字调用各个实现
            match name:
                case GitTools.STATUS:
                    status = git_status(repo)
                    # 具体的返回格式,就是这样约定的
                    return [TextContent(
                        type="text",
                        text=f"Repository status:\n{status}"
                    )]
    
                case GitTools.DIFF_UNSTAGED:
                    diff = git_diff_unstaged(repo)
                    return [TextContent(
                        type="text",
                        text=f"Unstaged changes:\n{diff}"
                    )]
    
                case GitTools.DIFF_STAGED:
                    diff = git_diff_staged(repo)
                    return [TextContent(
                        type="text",
                        text=f"Staged changes:\n{diff}"
                    )]
    
                case GitTools.DIFF:
                    diff = git_diff(repo, arguments["target"])
                    return [TextContent(
                        type="text",
                        text=f"Diff with {arguments['target']}:\n{diff}"
                    )]
    
                case GitTools.COMMIT:
                    result = git_commit(repo, arguments["message"])
                    return [TextContent(
                        type="text",
                        text=result
                    )]
    
                case GitTools.ADD:
                    result = git_add(repo, arguments["files"])
                    return [TextContent(
                        type="text",
                        text=result
                    )]
    
                case GitTools.RESET:
                    result = git_reset(repo)
                    return [TextContent(
                        type="text",
                        text=result
                    )]
    
                case GitTools.LOG:
                    log = git_log(repo, arguments.get("max_count", 10))
                    return [TextContent(
                        type="text",
                        text="Commit history:\n" + "\n".join(log)
                    )]
    
                case GitTools.CREATE_BRANCH:
                    result = git_create_branch(
                        repo,
                        arguments["branch_name"],
                        arguments.get("base_branch")
                    )
                    return [TextContent(
                        type="text",
                        text=result
                    )]
    
                case GitTools.CHECKOUT:
                    result = git_checkout(repo, arguments["branch_name"])
                    return [TextContent(
                        type="text",
                        text=result
                    )]
    
                case GitTools.SHOW:
                    result = git_show(repo, arguments["revision"])
                    return [TextContent(
                        type="text",
                        text=result
                    )]
    
                case _:
                    raise ValueError(f"Unknown tool: {name}")
    
        # 服务器初始化
        options = server.create_initialization_options()
        # 
        async with stdio_server() as (read_stream, write_stream):
            await server.run(read_stream, write_stream, options, raise_exceptions=True)
    
    

    看起来结构还是比较清晰的

    async def serve(repository: Path | None) -> None:
    	# 注册能力
        async def list_tools() -> list[Tool]:
        	...
    	
    	# 调用能力
        async def call_tool(name: str, arguments: dict) -> list[TextContent]:
        	...
        # 启动服务
        options = server.create_initialization_options()
        async with stdio_server() as (read_stream, write_stream):
            await server.run(read_stream, write_stream, options, raise_exceptions=True)
    
    

    Filesystem MCP Server

    https://github.com/modelcontextprotocol/servers/tree/main/src/filesystem

    有了上面的python的基础,再看Filesystem MCP Server是用nodejs实现的,虽然是ts代码,但是总体逻辑一样的

    image-20250325180641205

    Summary

    MCP Server内容大概是这些,后续可能随着发展这个还会有所变化

    Quote

    https://github.com/liaokongVFX/MCP-Chinese-Getting-Started-Guide

    https://github.com/modelcontextprotocol/python-sdk?tab=readme-ov-file#quickstart



沪ICP备19023445号-2号
友情链接