我正在使用 Jython InvokeScriptedProcessor 將資料從 json 結構構造到 sql 結構。我在使用特定功能時遇到問題。json.loads。json.loads 無法識別特殊字符,如 ?、é、á、í...
它以一種奇怪的形式寫出來。我還沒有達到任何形式來擁有它。
例如(非常簡單)
{"id":"?UECO","value":3.141592,"datetime":"....","location":"?UECO"}
如果我們嘗試在 sql 中撰寫它,例如
INSERT INTO .... (id, value) VALUES ("...",3.141592);
它會失敗。它讓我失望。我無法使用任何回傳選項回傳資料,成功或失敗,與 NiFi 的版本無關。這是我的代碼
def process(self, inputStream, outputStream):
# read input json data from flowfile content
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
data = json.loads(text)
兩者都不
data = json.loads(text.encode("utf-8"))
作業正常。文本以 unicode 形式出現。
def __generate_sql_transaction(input_data):
""" Generate SQL statement """
sql = """
BEGIN;"""
_id = input_data.get("id")
_timestamp = input_data.get("timestamp")
_flowfile_metrics = input_data.get("metrics")
_flowfile_metadata = input_data.get("metadata")
self.valid = __validate_metrics_type(_flowfile_metrics)
if self.valid is True:
self.log.error("generate insert")
sql = """
INSERT INTO
{0}.{1} (id, timestamp, metrics""".format(schema, table)
if _flowfile_metadata:
sql = ", metadata"
sql = """)
VALUES
('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))
self.log.error("generate metadata")
if _flowfile_metadata:
sql = ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
sql = """)
ON CONFLICT ({})""".format(on_conflict)
if not bool(int(self.update)):
sql = """
DO NOTHING;"""
else:
sql = """
DO UPDATE
SET"""
if bool(int(self.preference)):
sql = """
metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
else:
sql = """
metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))
else:
return ""
sql = """
COMMIT;"""
return sql
我再次將資料發送到 NiFi:
output = __generate_sql_transaction(data)
self.log.error("post generate_sql_transaction")
self.log.error(output.encode("utf-8"))
# If no sql_transaction is generated because requisites weren't met,
# set the processor output with the original flowfile input.
if output == "":
output = text
# write new content to flowfile
outputStream.write(
output.encode("utf-8")
)
那個輸出看起來像
INSERT INTO .... VALUES ("?UECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');
我的元資料中也有“?ueco”,它不適用于 id 和元資料
注意:似乎 InvokeScriptedProcessor 使用 Groove 而不是 Python 可以正常作業。但我的問題是我對 Groovy 一無所知......
有沒有人發現類似的問題?你是怎么解決的?
更新:
輸入示例:
{"id":"?UECO",
"metrics":{
"value":3.1415
},
"metadata":{
"location":"?UECO"
},
"timestamp":"2020-01-01 00:00:00 01:00"
}
期望輸出:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('?UECO',
'2020-01-01T00:00:00 01:00',
'{"value":3.1415}',
'{"location":"?UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
實際輸出:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('?UECO',
'2020-01-01T00:00:00 01:00',
'{"value":3.1415}',
'{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
uj5u.com熱心網友回復:
UPD
jython 不能正確處理位元組字串 - 所以,不要使用
.encode('utf-8')使用 java 方法將內容寫回具有特定編碼的流檔案
下面是一個正確讀取和寫入非 ascii 字符的示例,包括 ?
使用ExecuteScript帶有 jython 的處理器并替換_transform(text)函式體:
import traceback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class FlowWriter(StreamCallback):
def _transform(self, text):
# transform incoming text here
return '@@@@' text '****'
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
new_text = self._transform(text)
IOUtils.write(new_text, outputStream, StandardCharsets.UTF_8)
flowFile = session.get()
if flowFile != None:
try:
flowFile = session.write(flowFile, FlowWriter())
flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
except Exception as e:
log.error("{}\n{}".format(e,traceback.format_exc()))
session.rollback(True) # put file back and penalize it
uj5u.com熱心網友回復:
我最近找到了這個答案。
https://stackoverflow.com/a/35882335/7634711
這不是 NiFi 的問題。這是一個問題Python2以及它如何與json圖書館一起作業。Python3如果特殊字符出現在字典鍵中,問題也會出現。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/317929.html
標籤:python-2.7 apache-nifi 杰通 调用脚本
