articles:rtc_handle

文書の過去の版を表示しています。


rtc_handle

RTミドルウェア(OpenRTM-aist,RTM)のRTコンポーネント(RTC)をユーザのpython環境から自由に操作するための 環境を提供するツールです.

ghithubサイト(ときどき手を入れています):rtmtools

使用方法は,githubのREADME.mdおよびrtc_handle_example/scriptにあるipnbおよびそこから作成したhtmlを見て下さい.
現在,基本的な使い方としてbasic.htmlが置いてあります.今後も拡充する予定です.

初版のマニュアル類は以下にあります.細かい点で異なる部分もありますが基本的な使い方は同じなので参考にして下さい.

初版のマニュアル類:(RTミドルウェアコンテスト2008,日本ロボット工業会賞受賞)

以下はipnbで生成したものをdokuwikiフォーマットに変換したものです.


  • this ipnb shows a basic usage of rtc_handle.py
  • precondition: rtcs(cin and cout) are prelaunched separetely
  • you can monitor the behavior of the system with openrtp

you can access and control rtcs of OpenRTM-aist (written in any languages, ie., c++, python, java) by using rtc_handle.py

#!/usr/bin/env python
# -*- Python -*-
 
import sys
import time
import subprocess
  • path for rtc_handle
  • path for rtcs and user tools
  • nameservers

you may provide a file (ex. set_env.py) for this.

#
# set up user environment
#  RtmToolsDir, MyRtcDir, etc.
#
# from set_env import *   : you may provide a setup file like this
#
RtmToolsDir="../.."
MyRtcDir=".."
NS0="localhost:9876"

path is modified temporaly to import tools

stubs for rtc service ports might be imported (this will be explained another example).

#
# import user tools
#
sys.path.append(".")
save_path = sys.path[:]
sys.path.append(RtmToolsDir+'/rtc_handle')
from rtc_handle import *
# from rtc_handle_util import *
# sys.path.append(RtmToolsDir+'/embryonic_rtc')
# from EmbryonicRtc import *
sys.path = save_path
 
#
# import stub files
# 
#import _GlobalIDL

RtmEnv class object contains an orb, name-servers, rtcs, connectors and other rtm environment information.

the second arg is a list of cos nameservers.

#
# user program 
# 
 
#
env = RtmEnv(sys.argv,[NS0])

NameSpace.env.list_obj() retrieves a list of corba objects in the nameserver and put them into the NameSpace.obj_list dictionary.

if an object is an rtc, its proxy object(RtcHandl) is created and put into the NameSpace.rtc_handle dictionary.

env.name_space[NS0].list_obj()
objcet cin0.rtc was listed.
port_name: str_out
handle for cin0.rtc was created.
objcet cout0.rtc was listed.
port_name: str_in
handle for cout0.rtc was created.

[['cin0.rtc', <OpenRTM._objref_DataFlowComponent at 0x7fdbdc4fc970>],
 ['cout0.rtc', <OpenRTM._objref_DataFlowComponent at 0x7fdbcde64be0>]]
env.name_space[NS0].obj_list
{'cin0.rtc': <OpenRTM._objref_DataFlowComponent at 0x7fdbdc4fc970>,
 'cout0.rtc': <OpenRTM._objref_DataFlowComponent at 0x7fdbcde64be0>}
env.name_space[NS0].rtc_handles
{'cin0.rtc': <rtc_handle.RtcHandle at 0x7fdbdc50e3a0>,
 'cout0.rtc': <rtc_handle.RtcHandle at 0x7fdbcde662e0>}

RtcHandle is a proxy class of rtc and the center object of this module. an rtc can be accessed by its object reference and information of the rtc can be gathered throug the reference. an RtcHandle object holds those information and proxy the rtc.

assign rtc proxies to valiables

to ease access to rtc proxies, it may be a good idea assigning them to valiables.

cin=env.name_space[NS0].rtc_handles['cin0.rtc']
cout=env.name_space[NS0].rtc_handles['cout0.rtc']

activate and deactivate rtcs

cout.activate()
RTC_OK
cout.deactivate()
RTC_OK
cout.activate()
RTC_OK

deactivation of some rtcs may fail. those rtcs may wait for resorces(ex. waiting for user input) in onExecute loop. for example,

cin.activate()
RTC_OK
cin.deactivate()
RTC_ERROR

but it usually recovers after the resorce is available. please input something at the cin console. then,

cin.activate()
RTC_OK

direct access to Inports and Outports

if the interface_type of the ports is corba_cdr, you can put data to inports and get data from outpors by using rtc_handle.py.

put data to inport
cout.inports
{'str_in': <rtc_handle.RtcInport at 0x7fdbcde6f6a0>}
cout.inports['str_in'].open()
RTC_OK
cout.inports['str_in'].write('abc')
PORT_OK
cout.inports['str_in'].close()
RTC_OK
get data from outport

by connecting to outport with setting ‘datapot.dataflow_type’ : ‘pull’ , you can get ‘dataport.corba_cdr.outport_ref’.

you can directly get the last data (and if not consumed other rtcs) by the ref.

cin.outports
{'str_out': <rtc_handle.RtcOutport at 0x7fdbcde66220>}
cin.outports['str_out'].open()
RTC_OK
cin.outports['str_out'].con.prop_dict
{'dataport.dataflow_type': 'pull',
 'dataport.interface_type': 'corba_cdr',
 'dataport.subscription_type': 'new',
 'dataport.publisher.push_policy': 'new',
 'dataport.inport.buffer.length': '1',
 'dataport.inport.buffer.read.empty_policy': 'do_nothing',
 'dataport.inport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.length': '1',
 'dataport.outport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.read.empty_policy': 'do_nothing',
 'dataport.data_type': 'TimedString',
 'dataport.serializer.cdr.endian': 'little,big',
 'dataport.corba_cdr.outport_ior': 'IOR:010000001b00000049444c3a4f70656e52544d2f4f7574506f72744364723a312e300000010000000000000064000000010102000e0000003139322e3136382e35302e33350063cd0e000000fe37b1886100008405000000000a00000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
 'dataport.corba_cdr.outport_ref': <OpenRTM._objref_OutPortCdr at 0x7fdbcce03550>}
cin.outports['str_out'].read()
cin.outports['str_out'].close()
RTC_OK

IOConnector contains information for connecting io-ports

create a connector between cin.outports[‘str_out’] and cout.inports[‘str_in’]

con = IOConnector([cin.outports['str_out'], cout.inports['str_in']])

default properties of the connector is as follows

con.def_prop
{'dataport.dataflow_type': 'push',
 'dataport.interface_type': 'corba_cdr',
 'dataport.subscription_type': 'new',
 'dataport.publisher.push_policy': 'new',
 'dataport.inport.buffer.length': '1',
 'dataport.inport.buffer.read.empty_policy': 'do_nothing',
 'dataport.inport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.length': '1',
 'dataport.outport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.read.empty_policy': 'do_nothing'}

connect ports

con.connect()
RTC_OK
con.profile
RTC.ConnectorProfile(name='cin0.str_out_cout0.str_in', connector_id='83a8f542-408b-11ec-b42d-594718fca3b7', ports=[<RTC._objref_PortService object at 0x7fdbcde71d90>, <RTC._objref_PortService object at 0x7fdbcde71ca0>], properties=[SDOPackage.NameValue(name='dataport.dataflow_type', value=CORBA.Any(CORBA.TC_string, 'push')), SDOPackage.NameValue(name='dataport.interface_type', value=CORBA.Any(CORBA.TC_string, 'corba_cdr')), SDOPackage.NameValue(name='dataport.subscription_type', value=CORBA.Any(CORBA.TC_string, 'new')), SDOPackage.NameValue(name='dataport.publisher.push_policy', value=CORBA.Any(CORBA.TC_string, 'new')), SDOPackage.NameValue(name='dataport.inport.buffer.length', value=CORBA.Any(CORBA.TC_string, '1')), SDOPackage.NameValue(name='dataport.inport.buffer.read.empty_policy', value=CORBA.Any(CORBA.TC_string, 'do_nothing')), SDOPackage.NameValue(name='dataport.inport.buffer.write.full_policy', value=CORBA.Any(CORBA.TC_string, 'overwrite')), SDOPackage.NameValue(name='dataport.outport.buffer.length', value=CORBA.Any(CORBA.TC_string, '1')), SDOPackage.NameValue(name='dataport.outport.buffer.write.full_policy', value=CORBA.Any(CORBA.TC_string, 'overwrite')), SDOPackage.NameValue(name='dataport.outport.buffer.read.empty_policy', value=CORBA.Any(CORBA.TC_string, 'do_nothing')), SDOPackage.NameValue(name='dataport.data_type', value=CORBA.Any(CORBA.TC_string, 'TimedString')), SDOPackage.NameValue(name='dataport.serializer.cdr.endian', value=CORBA.Any(CORBA.TC_string, 'little,big')), SDOPackage.NameValue(name='dataport.corba_cdr.inport_ior', value=CORBA.Any(CORBA.TC_string, 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001900000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100')), SDOPackage.NameValue(name='dataport.corba_cdr.inport_ref', value=CORBA.Any(CORBA.TypeCode("IDL:OpenRTM/InPortCdr:1.0"), <OpenRTM._objref_InPortCdr object at 0x7fdbcce08910>))])
con.prop_dict
{'dataport.dataflow_type': 'push',
 'dataport.interface_type': 'corba_cdr',
 'dataport.subscription_type': 'new',
 'dataport.publisher.push_policy': 'new',
 'dataport.inport.buffer.length': '1',
 'dataport.inport.buffer.read.empty_policy': 'do_nothing',
 'dataport.inport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.length': '1',
 'dataport.outport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.read.empty_policy': 'do_nothing',
 'dataport.data_type': 'TimedString',
 'dataport.serializer.cdr.endian': 'little,big',
 'dataport.corba_cdr.inport_ior': 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001900000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
 'dataport.corba_cdr.inport_ref': <OpenRTM._objref_InPortCdr at 0x7fdbcce08910>}

disconnect ports

con.disconnect()
RTC_OK

change properties

you can change properties by giving prop_dict

con = IOConnector([cin.outports['str_out'], cout.inports['str_in']], 
                  prop_dict={'dataport.inport.buffer.length': '8'})
con.prop_dict_req
{'dataport.dataflow_type': 'push',
 'dataport.interface_type': 'corba_cdr',
 'dataport.subscription_type': 'new',
 'dataport.publisher.push_policy': 'new',
 'dataport.inport.buffer.length': '8',
 'dataport.inport.buffer.read.empty_policy': 'do_nothing',
 'dataport.inport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.length': '1',
 'dataport.outport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.read.empty_policy': 'do_nothing',
 'dataport.data_type': 'TimedString'}
con.connect()
RTC_OK
con.prop_dict
{'dataport.dataflow_type': 'push',
 'dataport.interface_type': 'corba_cdr',
 'dataport.subscription_type': 'new',
 'dataport.publisher.push_policy': 'new',
 'dataport.inport.buffer.length': '8',
 'dataport.inport.buffer.read.empty_policy': 'do_nothing',
 'dataport.inport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.length': '1',
 'dataport.outport.buffer.write.full_policy': 'overwrite',
 'dataport.outport.buffer.read.empty_policy': 'do_nothing',
 'dataport.data_type': 'TimedString',
 'dataport.serializer.cdr.endian': 'little,big',
 'dataport.corba_cdr.inport_ior': 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001a00000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
 'dataport.corba_cdr.inport_ref': <OpenRTM._objref_InPortCdr at 0x7fdbcce08f70>}
con.disconnect()
RTC_OK

conflict of connections

only one connection is permitted between the same ports.

so, if another connection exists, you can not control the connection by your connector.

for example,

b = IOConnector([cin.outports['str_out'], cout.inports['str_in']])
b.connect()
RTC_OK
con.connect()
there exists another connection. please try force=True.

'NO'
con.disconnect()
there exists another connection. please try force=True.

'NO'

you can handle this situation by forcing connect/disconnect operation

con.connect(force=True)
RTC_OK
 
  • articles/rtc_handle.1636430386.txt.gz
  • 最終更新: 2021/11/09 12:59
  • by Takashi Suehiro