1 Star 0 Fork 0

恐咖兵糖 / www.ftls.xyz

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
2022-12-25-matrix-robot.md 2.13 KB
Copy Edit Raw Blame History
恐咖兵糖 authored 2024-02-24 17:21 . pref: 修改分类
title: "Matrix Python Robot"
subtitle: ""
slug: "20221225-matrix-robot"
date: 2022-12-24T21:10:40+08:00
lastmod: 2022-12-24T21:10:40+08:00
draft: false
description: ""

tags: ["技术","Matrix"]
categories: ["代码"]

featuredImage: ""
featuredImagePreview: ""

hiddenFromHomePage: false

lightgallery: false
toc: false

看了一眼 Matrix 的 bot sdk。随手写的机器人。实现了 python 登录 matrix 网络功能。还增加了一个往 obsidian 保存的功能。

requirements.txt

nio==3.4.2

如果不好使 安装

pip install matrix-nio pip install "matrix-nio[e2e]"
pip install requests

bot.py

from importlib import util
import asyncio
from nio import AsyncClient, SyncResponse, RoomMessageText

async_client = AsyncClient("https://matrix.org", "mybotaccount")

response_string = "!bot"

import json
import requests

def send_ob(str):
    '''python send to obsidian'''
    req = requests.post(
        url="https://xxxxxxxx.cn-beijing.fcapp.run/ob/today",
        headers={
            "Content-Type": "application/json",
            "Token": "xxxxxxxxxxxxxx",
        },
        data=json.dumps({"content": str }),
    )
    return req.text

async def main():
    response = await async_client.login("xxxxxxxxxx")
    print(response)
    while True:
        sync_response = await async_client.sync(30000)
        print(sync_response)  # note that this could be LARGE!
        # do some reading from sync_response
        joins = sync_response.rooms.join
        for room_id in joins:
            for event in joins[room_id].timeline.events:
                if hasattr(event, "body") and event.body.startswith(response_string):
                    response_body = event.body.replace(response_string, "").strip()
                    content = {"body": send_ob(response_body), "msgtype": "m.text"}
                    # content = {"body": response_body, "msgtype": "m.text"}
                    await async_client.room_send(room_id, "m.room.message", content)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
1
https://gitee.com/kkbt/www.ftls.xyz.git
git@gitee.com:kkbt/www.ftls.xyz.git
kkbt
www.ftls.xyz
www.ftls.xyz
master

Search