首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Python之python-can

  • 23-11-14 09:53
  • 4771
  • 6650
blog.csdn.net

Python之python-can

文章目录

  • Python之python-can
    • 1、python-can基本
    • 2、python-can 安装
    • 3、配置
      • 3.1 代码中直接配置
      • 3.2 使用配置文件配置
      • 3.3 使用环境变量配置
    • 4、基本使用
      • 4.1发送单帧报文到总线
      • 4.2发送周期报文到总线
      • 4.3接收总线的can报文
    • 5、 使用Listener和NotifIer
    • 6、bus总线设置过滤报文
    • 7、提供的脚本
      • 7.1 can.logger
      • 7.2 can.player
      • 7.3 can.viewer
      • 7.4 can.logconvert

1、python-can基本

python-can 库为 Python 提供控制器局域网支持,为不同的硬件设备提供通用抽象,以及一套用于在 CAN 总线上发送和接收消息的实用程序。

python-can 可以在任何 Python 运行的地方运行; 从 CAN 的高功率计算机到 USB 设备,再到运行 linux 的低功率设备,例如 BeagleBone 或 RaspberryPi。

更具体地说,该库的一些示例用途:

被动记录 CAN 总线上发生的情况。 例如,使用 OBD-II 端口监控商用车辆。
测试通过 CAN 交互的硬件。 在现代汽车、摩托车、船只甚至轮椅中发现的模块已经使用这个库从 Python 中测试了组件。
在回路中对新的硬件模块或软件算法进行原型设计。 轻松与现有总线交互。
创建虚拟模块以原型 CAN 总线通信。

2、python-can 安装

使用pip安装

pip install python-can
  • 1

同时根据使用的硬件设备需要安装对应的驱动等,参照Installation — python-can 4.0.0 documentation

3、配置

3.1 代码中直接配置

can对象公开了一个rc字典,可用于设置interface和channel

import can
can.rc['interface'] = 'neovi'  # 配置硬件类型
can.rc['channel'] = 2        # 配置通道,根据具体的硬件,int或者str
can.rc['bitrate'] = 500000   # 波特率


from can.interface import Bus

bus = Bus()   # 使用rc字典中的配置实例化can
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

也可以在代码中直接指定接口和通道实例化can

import can

bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=500000)

bus2 = bus = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
  • 1
  • 2
  • 3
  • 4
  • 5

3.2 使用配置文件配置

在 Linux 系统上,配置文件在以下路径中搜索:

~/can.conf

/etc/can.conf

$HOME/.can

$HOME/.canrc
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在 Windows 系统上,配置文件在以下路径中搜索:

%USERPROFILE%/can.conf

can.ini(当前工作目录)

%APPDATA%/can.ini

ps:%USERPROFILE%目录为 C:Users用户名

%APPDATA% 目录为 C:Users用户名AppDataRoaming
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

配置文件可设置默认接口和通道如下:

[default]
interface = 
channel = 
bitrate = 
  • 1
  • 2
  • 3
  • 4

同时也可以添加其他节点如下:

[HS]

# All the values from the 'default' section are inherited

channel = 
bitrate = 

[MS]

# All the values from the 'default' section are inherited

channel = 
bitrate = 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

使用配置文件的配置实例化can

from can.interface import Bus

bus = Bus()    # 使用默认(default)配置
hs_bus = Bus(context='HS')        # 使用HS节点配置
ms_bus = Bus(context='MS')        # 使用MS节点配置
  • 1
  • 2
  • 3
  • 4
  • 5

3.3 使用环境变量配置

可以设置以下环境变量

CAN_INTERFACE                设置接口类型

CAN_CHANNEL                  设置通道

CAN_BITRATE                  设置波特率

CAN_CONFIG   

CAN_CONFIG                   允许使用 JSON 设置任何总线配置,例如:

CAN_CONFIG={"receive_own_messages": true, "fd": true}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4、基本使用

4.1发送单帧报文到总线

实例化一条bus总线, 使用can.Message()类创建一条message,bus调用send函数将创建的message实例发送到总线。

import can

def send_one():

    # this uses the default configuration (for example from the config file)
    # see https://python-can.readthedocs.io/en/stable/configuration.html
    #bus = can.interface.Bus()
     
    # Using specific buses works similar:
    bus = can.interface.Bus(bustype='bmcan', channel=0, bitrate=500000, data_bitrate=2000000, tres=True)
    # bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)
    # bus = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
    # bus = can.interface.Bus(bustype='ixxat', channel=0, bitrate=250000)
    # bus = can.interface.Bus(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
    # ...
     
    msg = can.Message(arbitration_id=0xc0ffee,
                      data=[0, 25, 0, 1, 3, 1, 4, 1],
                      is_extended_id=True, )
     
    try:
        bus.send(msg, timeout=None)
        print("Message sent on {}".format(bus.channel_info))
    except can.CanError:
        print("Message NOT sent")
     
    bus.shutdown()

if __name__ == '__main__':
    send_one()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

4.2发送周期报文到总线

以下实例分别演示简单发送周期报文,发送定时的周期报文,和发送周期报文过程中修改发送的内容。

注意:在发送定时的周期报文时,达到超时时间任务结束后,task将仍旧由总线跟踪。除非在创建任务时设置参数store_task=False,或者调用stop()函数结束任务task.stop()。

import time
import can

def simple_periodic_send(bus):
    """
    发送一条周期为200ms的周期报文到总线
    2s后停止发送报文
    """
    print("Starting to send a message every 200ms for 2s")
    msg = can.Message(arbitration_id=0x123, data=[1, 2, 3, 4, 5, 6], is_extended_id=False)
    task = bus.send_periodic(msg, 0.20)
    assert isinstance(task, can.CyclicSendTaskABC)
    time.sleep(2)
    task.stop()
    print("stopped cyclic send")

def limited_periodic_send(bus):
    """
    发送一条有时间限制的周期报文到总线,达到超时时间后,停止任务
    """
    print("Starting to send a message every 200ms for 1s")
    msg = can.Message(arbitration_id=0x12345678, data=[0, 0, 0, 0, 0, 0], is_extended_id=True)
    task = bus.send_periodic(msg, 0.20, 1, store_task=False)
    if not isinstance(task, can.LimitedDurationCyclicSendTaskABC):
        print("This interface doesn't seem to support a ")
        task.stop()
        return

    time.sleep(2)

def test_periodic_send_with_modifying_data(bus):
    """
    在发送周期报文过程中修改发送的数据
    """
    print("Starting to send a message every 200ms. Initial data is ones")
    msg = can.Message(arbitration_id=0x0cf02200, data=[1, 1, 1, 1])
    task = bus.send_periodic(msg, 0.20)
    if not isinstance(task, can.ModifiableCyclicTaskABC):
        print("This interface doesn't seem to support modification")
        task.stop()
        return
    time.sleep(2)
    print("Changing data of running task to begin with 99")
    msg.data[0] = 0x99
    task.modify_data(msg)
    time.sleep(2)

    task.stop()
    print("stopped cyclic send")
    print("Changing data of stopped task to single ff byte")
    msg.data = bytearray([0xff])
    msg.dlc = 1
    task.modify_data(msg)
    time.sleep(1)
    print("starting again")
    task.start()
    time.sleep(1)
    task.stop()
    print("done")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

4.3接收总线的can报文

以下示例演示使用recv()函数接收来自总线的can 报文,然后打印报文。

import can

def receive_all():

    bus = can.interface.Bus(bustype='ixxat', channel=0, bitrate=500000)
     
    print('Waiting for RX CAN messages ...')
    try:
        while True:
            msg = bus.recv(1)
            if msg is not None:
                print(msg)
    except KeyboardInterrupt:
        pass

 
if __name__ == "__main__":
    receive_all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

5、 使用Listener和NotifIer

Notifier 对象用作总线的消息分发器。Notifier 创建一个线程来从总线读取消息并将它们分发给listeners。

Listener 类是任何希望注册以接收总线上新消息通知的对象的“抽象”基类。通常使用是需继承can.Listener。重写on_message_received()方法,以实现收到特定的消息的处理逻辑。

以下示例,实现MyListener处理收到message 0x123时,发送message 0x456到总线,打印message的回调函数,创建一个日志记录器,添加到listeners列表。

实例化一个Notifier对象,添加监听的bus总线,绑定listeners列表。当Notifier对象收到bus总线的任意消息,则会将该消息分发listeners列表中的各个监听器。

Notifier对象使用stop()函数以结束监听bus。

有一些侦听器python-can已经提供 ,其中一些允许将消息写入文件,例如can.Logger。

import can
from can.message import Message

class MyListener(can.Listener):
    def __init__(self):
        super(MyListener, self).__init__()

    def on_message_received(self, msg: Message) -> None:
        """
        example
        when receive message 0x123,transmit message 0x456
        """
        if msg.arbitration_id == 0x123:
            transmit_msg = can.Message(arbitration_id=0x456,
                                       dlc=8,
                                       data=[0 for _ in range(8)],
                                       is_extended_id=False)
            bus.send(transmit_msg)

 


def print_msg(msg):
    print(msg)


if __name__ == "__main__":
    bus = can.interface.Bus('virtual_ch', bustype='virtual')
    logger = can.Logger("logfile.asc")  # save log to asc file
    listeners = [
        print_msg,  # Callback function, print the received messages
        logger,  # save received messages to asc file
        MyListener   # my listener
    ]
    notifier = can.Notifier(bus, listeners)
    running = True
    while running:
        input()
        running = False

    # It's important to stop the notifier in order to finish the writting of asc file
    notifier.stop()
    # stops the bus
    bus.shutdown()


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

6、bus总线设置过滤报文

可以使用set_filters()设置过滤报文,对bus接收到的所有报文进行过滤,返回过滤器匹配的消息。

set_filters()参数为一个可迭代的字典,每个字典都包含一个“can_id”、一个“can_mask”和一个可选的“extended”键。

[{"can_id": 0x11, "can_mask": 0x21, "extended": False}]
  • 1

过滤器会匹配 & can_mask == can_id & can_mask的所有消息。如果extended 参数也设置了,则仅会匹配 == extended的消息。否则,它仅根据仲裁 ID 和掩码匹配每条消息。

bus = can.interface.Bus('virtual_ch', bustype='virtual')
bus.set_filters([{"can_id": 0x123, "can_mask": 0xFFFF, "extended": False}])
  • 1
  • 2

7、提供的脚本

python-can已经提供一些可直接使用的脚本

参照Scripts — python-can 4.0.0 documentation

7.1 can.logger

can.logger 用于记录CAN log,将消息打印到标准输出或给定文件

可以使用python -m can.logger 或者 can_logger.py 的命令行方式调用

7.2 can.player

can.player 用于回放记录的can 报文日志文件

7.3 can.viewer

一个带有GUI界面的简单的can 总线 报文消息的查看器

7.4 can.logconvert

can.logconvert用于日志文件格式的转换, 将日志文件从一种格式转换为另一种格式

《AUTOSAR谱系分解(ETAS工具链)》之总目录

注:本文转载自blog.csdn.net的PlutoZuo的文章"https://blog.csdn.net/PlutoZuo/article/details/133637707"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top