ラズパイでCANの情報をモニタし、リアルタイムで車速情報などを表示する

2022/04/06

python プログラミング ラズパイ 設定

前回までの記事ではエンジンの回転数/アクセル・ブレーキ情報/車速情報などをモニタしました。
↓前回記事
今回は下記サイトを参考に燃料噴出量やその他の情報をリアルタイムでモニタしていきたいと思います。
(故障した際の警告灯情報なども見れるみたいですが、これが本当なら解析した人すごいと思います。。。)

※車両のOBDカプラに接続するという行為は通信を妨害してしまい、故障が入ってしまう可能性もあります。
(例えばCANのH-Lを短絡させて通信できなくしてしまったり、余計なCAN情報を流してしまったりなど)

作業する際は細心の注意を払ったうえで、自己責任でお願いします。

ターミナル上に情報を上書きしながら表示する

print文を使ってCANの情報をターミナルに表示すると情報がどんどん流れていってしまい、とても見ずらいです。情報が流れていかないようにターミナル上に値を上書きしながらCANの情報を表示をしていきリアルタイムモニタを実現します。。この機能を実現するために"キャリッジリターン"と"エスケープシーケンス"の2つを使います。

キャリッジリターンとは

CRとは、ASCII文字コード体系に定められた制御コードの一つで、文字の入出力位置の行頭復帰を意味するもの。コードは十進数で13番、16進数で「0D」(0x0D/0Dh)。正規表現では「¥r」と表記する。

簡単に言うと行の先頭に戻る命令の事です。Pythonの場合、printで出力すると改行コードが入っているため、コマンドライン上に表示した値がどんどん流れていってしまいます。

それだと見づらいため改行コードをなくしたうえで、キャリッジリターンを使って先頭行に戻ることで値を上書きしていきます。

キャリッジリターンの動作イメージ
キャリッジリターンの動作イメージ(printの改行コードを削除した場合)


エスケープシーケンスとは

今回は速度情報、水温情報など複数の情報を見たいので、複数行で値を表示したいと思ってます。1行で値を表示する場合はキャリッジリターンだけで問題ないのですが、複数行で値を表示する場合はエスケープシーケンスも使う必要があります。
エスケープシーケンスとは、エスケープコードで始まる画面制御のための文字列です。(中略)
文字列の中では、ESCコードは以下のように書き表します。
¥033 : 8進数で表記
¥x1b : 16進数で表記

エスケープシーケンスの中に"ESC[nA"というコードがあります。こちらはカーソルをn行上に移動するというものです。このエスケープシーケンスとキャリッジリターンを組み合わせて常に先頭から値を表示するようにし、値が流れないようにしていきます。

エスケープシーケンスとキャリッジリターンを組み合わせた場合の動作イメージ(printの最後の改行コードは削除した場合)

テストプログラムと表示結果

2重ループでi,jを回してそれぞれの値を上書きしながら2行で表示していくテストプログラムを作成しました。
for i in range(10):
    for j in range(10):
        print("\r i = %d \n j = %d  \033[1A " %(i,j) ,end=" ") #\033[2A
print("\033[1B")



動作としては先頭カーソルを移動したうえで値を表示→2行表示したら1行上に上がる→先頭にカーソルを移動したうえで値を表示...を繰り返していきます。
一番最後にカーソルを1行下げる処理を追加しています。(これが無いとカーソルの位置が戻らなく、実行後にフォルダ情報?で上書きされてしまうため)

実行した結果下記のように上書きされながら表示されました。
実行した結果

補足ですが、上記はUbuntsu環境で実行しています。windowsの場合はデフォルトではエスケープシーケンスを扱うことが出来ません。詳細は下記引用先を参照お願いします。
なお、エスケープシーケンスは、標準状態のWindowsのコマンドプロンプト(DOS窓)では使用できません(単に“[2J [5;10H"”などの文字列が表示されるだけです)。コマンドプロンプトでエスケープシーケンスを有効にするには、起動時に“ansi.sys”というドライバを読み込ませておく必要があります。
エスケープシーケンスによる画面制御より引用

これでCANの情報を上書きしながらコマンドライン上に表示する準備が整いました。

CANから必要な情報を抜き出す

まず最初にCANの制御情報は1byteないし2byteで16進数表記で表示されています。
このままでは人間が理解することが出来ないため、16進数表記の情報を10進数に変換する関数を作成します。
def cal_hex2dec_2byte(data_name1, data_name2):#16進数の文字列2バイト分を受け取って、10進数に変換する関数
    data_mix = str(data_name1)+str(data_name2)
    data_int = int(data_mix,16)
    return data_int

def cal_hex2dec_1byte(data_name1):#16進数の文字列1バイト分を受け取って、10進数に変換する関数
    data_int = int(data_name1,16)
    return data_int

次にCANの受信部のコードを作成します。
参考にしたリンクのCAN情報を元に必要な情報を抜き出していきます。

受信部のコードとしては下記のようになってます。(全体のコードは最後に記載します)
while msg != None:
    msg = can0.recv(10.0)
    msg_list = [x.strip() for x in str(msg).split()]#空白を削除して配列に格納
    list.append(msg)#データ保存用に受信データをリストに格納 
    if len(msg_list) > 3:#エンジン停止後のECUスリープ前にメッセージがタイムスタンプのみ保存され、msg_list[3]の処理が要素外アクセスエラーが発生するため回避用
        if msg_list[3] == "0158":
            car_speed = cal_hex2dec_2byte(msg_list[7],msg_list[8])*0.01 #車速情報
            distance_after = cal_hex2dec_1byte(msg_list[13])*0.01#距離情報(kmに変換)
            if ((distance_after - distance_before) >= 0 ):#カウンターが255までなので、1周する前の処理
                distance += (distance_after - distance_before)
                distance_before = distance_after
            else:
                distance += ((2.55-distance_before) + distance_after)#カウンターが255までなので、1周したあとの処理
                distance_before = distance_after

        elif msg_list[3] == "013a":
            gas_pedal_open = cal_hex2dec_1byte(msg_list[8])#アクセル開度情報
        
        elif msg_list[3] == "0324":
            water_temp = cal_hex2dec_1byte(msg_list[7])-40#水温情報(51℃以上で警告灯消灯、71℃で暖気終了) 
            fi = cal_hex2dec_2byte(msg_list[9],msg_list[10])*0.10886#燃料噴出量積算値ml(エンジン停止で0)
        elif msg_list[3] == "0164":
            fuel_remain = cal_hex2dec_1byte(msg_list[10])
        if (distance != 0) and (fi != 0):
            fuel_economy = distance/(fi/1000)#燃料噴出量をℓに変換したうえで燃費を求める
        print("\r car_speed: %lf \n distance:%lf \n gas_pedal_open: %lf \n water_temp: %lf  \n fuel_remain : %lf \n FI: %lf \n fuel_economy: %lf \033[6A" %(car_speed, distance, gas_pedal_open, water_temp, fuel_remain, fi, fuel_economy), end =" ")#必要な情報を表示
    else:
        print("\033[6B")#
        print("ECU Sleep")

距離情報については0~255までのリンギングカウンタとなっているため、総走行距離を出すためにカウンタが1周した後の処理も記載しています。
また、インジェクションの燃料噴出量と走行距離から燃費を求める処理も記載してます。

その他の情報についてはコメントを記載しているので、参照お願いします。

下記画像が実行した結果です。
実行結果
※念のための注釈ですが、上記スクショは別の人が運転している際のスクショです。(車速情が出ているため自分で運転中にスクショを撮ったというわけではないという意味です。)

無事車速情報や距離などの情報を表示させることが出来ました。

何となくの感想ですが、コンソールの表示上て40度程度なのに、メーター表示の暖気マーク?が消えていたため水温の表示が遅れているように感じました。

ネットで純正の温度センサーの精度は悪いみたいなのを見たことがあるので、そのせいな気もしています。社外の水温計を付けて入れば比較が出来るのですが、あいにく今の自分の車には付いていないため知り合いで付けている人がいれば確認してみたいと思います。

その他の情報も多少ラグはありそうですが、そんなに気にならなかったので問題なく表示できてそうです。

ソースコード全体

下記が全体のコードです。
ロギングしたデータをファイルに保存する処理も入っているので、不要な方は削除してください。
(2022/07/03 追記)
こちらに記載しているコードはpython-canのバージョンが3.3.4です。python-canのバージョンが4以上で実行するとエラーが発生します。
後ほどソースコードを差し替えますが、差分については下記ブログの追記をご確認お願いします。
#CANを受信するコード
#----------------------------------------------------------------------------------
import os
import can
import pandas as pd
import time
import datetime
import traceback

#変数定義箇所
#----------------------------------------------------------------------------------
def cal_hex2dec_2byte(data_name1, data_name2):#16進数の文字列2バイト分を受け取って、10進数に変換する関数
    data_mix = str(data_name1)+str(data_name2)
    data_int = int(data_mix,16)
    return data_int

def cal_hex2dec_1byte(data_name1):#16進数の文字列1バイト分を受け取って、10進数に変換する関数
    data_int = int(data_name1,16)
    return data_int


#----------------------------------------------------------------------------------

os.system('sudo ip link set can0 type can bitrate 500000')
os.system('sudo ifconfig can0 up')

can0 = can.interface.Bus(channel = 'can0', bustype = 'socketcan_ctypes')# socketcan_native

#msg = can.Message(arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7], extended_id=False)
#ファイル出力用の初期化処理
#----------------------------------------------------------------------------------
dt_now = datetime.datetime.now()
f_name = str(dt_now)+".csv"
file = open(str(dt_now)+"log.txt","w",encoding="utf-8")
print("Start\n",file=file)
print("Start")
#----------------------------------------------------------------------------------

#初期化処理
#----------------------------------------------------------------------------------
df = pd.DataFrame()
loging_time = time.time()
msg = 0#初期化処理
list = []
distance_before = 0
distance = 0
gas_pedal_open = 0
water_temp = 0 
fi = 0
fuel_remain = 0
fuel_economy = 0
car_speed = 0
#----------------------------------------------------------------------------------


while msg != None:
    msg = can0.recv(10.0)
    msg_list = [x.strip() for x in str(msg).split()]#空白を削除して配列に格納
    list.append(msg)#データ保存用に受信データをリストに格納 
    if len(msg_list) > 3:#エンジン停止後のECUスリープ前にメッセージがタイムスタンプのみ保存され、msg_list[3]の処理が要素外アクセスエラーが発生するため回避用
        if msg_list[3] == "0158":
            car_speed = cal_hex2dec_2byte(msg_list[7],msg_list[8])*0.01 #車速情報
            distance_after = cal_hex2dec_1byte(msg_list[13])*0.01#距離情報(kmに変換)
            if ((distance_after - distance_before) >= 0 ):#カウンターが255までなので、1周する前の処理
                distance += (distance_after - distance_before)
                distance_before = distance_after
            else:
                distance += ((2.55-distance_before) + distance_after)#カウンターが255までなので、1周したあとの処理
                distance_before = distance_after

        elif msg_list[3] == "013a":
            gas_pedal_open = cal_hex2dec_1byte(msg_list[8])#アクセル開度情報
        
        elif msg_list[3] == "0324":
            water_temp = cal_hex2dec_1byte(msg_list[7])-40#水温情報(51℃以上で警告灯消灯、71℃で暖気終了) 
            fi = cal_hex2dec_2byte(msg_list[9],msg_list[10])*0.10886#燃料噴出量積算値ml(エンジン停止で0)
        elif msg_list[3] == "0164":
            fuel_remain = cal_hex2dec_1byte(msg_list[10])
        if (distance != 0) and (fi != 0):
            fuel_economy = distance/(fi/1000)#燃料噴出量をℓに変換したうえで燃費を求める
        print("\r car_speed: %lf \n distance:%lf \n gas_pedal_open: %lf \n water_temp: %lf  \n fuel_remain : %lf \n FI: %lf \n fuel_economy: %lf \033[6A" %(car_speed, distance, gas_pedal_open, water_temp, fuel_remain, fi, fuel_economy), end =" ")#必要な情報を表示
    else:
        print("\033[6B")#
        print("ECU Sleep")


loging_time = time.time() -loging_time
if msg is None:
    print("Logging end. File output in progress.\n",file=file)
    print("Logging end. File output in progress.")

    df=pd.DataFrame(list)
    file_out_put_time = time.time()
    df.to_csv(f_name)
    file_out_put_time = time.time() - file_out_put_time
    # print("Finish\n\n",file=file)
    print("Finish_OUTPUT_DATA\n")
    print("START_OUTPUT_LOGING_TIME")
    print("Logging time : "+str(loging_time)+"\n",file=file)
    print("File_output_time : "+str(file_out_put_time)+"\n",file=file)
    print("type:"+str(type(msg)),file=file)
    print(msg,file=file)
    print("type(str):"+str(msg),file=file)
    print(str(msg),file=file)
else:
    print("ERROR",file=file)
    print("File NOT SAVED",file=file)
    print(traceback.format_exc(),file = file)
    
    
file.close()
os.system('sudo ifconfig can0 down')

まとめ

CANの情報から車速情報などをリアルタイムで取得し、表示できるようになりました。
現状の実装だとCANの情報が一度途絶えてしまうと再度プログラムを実行しないとモニタが出来ない状態なので、次回は割り込みなどを駆使してCANが流れたら受信処理を実行できるようにしたいと思います。

また、今回紹介しなかったデータの出力処理ですが、エンジン切ってからCANが来なくなるまで5分程度かかってそうなので、エンジン切ったらデータ出力処理を実施するように変更したいと思います。

自己紹介

はじめまして 社会人になってからバイクやプログラミングなどを始めました。 プログラミングや整備の記事を書いていますが、独学なので間違った情報が多いかもしれません。 間違っている情報や改善点がありましたらコメントしていただけると幸いです。

X(旧Twitter)

フォローお願いします!

ラベル

ブログ アーカイブ

QooQ