我有一個 SQLite 資料庫,我想將其匯入 DataBricks 上的 Spark。
當我運行下面的命令時,我在該命令下方收到錯誤。
df = spark.read.format('jdbc') \
.options(driver='org.sqlite.JDBC', dbtable='issn',
url='jdbc:sqlite:/dbfs/mnt/the_path/test.sqlite').load()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-4113757> in <module>
6 df = spark.read.format('jdbc') \
7 .options(driver='org.sqlite.JDBC', dbtable='issn',
----> 8 url='jdbc:sqlite:/dbfs/mnt/if_i_told_you_the_path_i_would_have_to_kill_you/lanabug.sqlite').load()
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
183 else:
--> 184 return self._df(self._jreader.load())
185
186 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o4773.load.
: java.sql.SQLException: Unsupported type NULL
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:256)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:321)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:373)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
at sun.reflect.GeneratedMethodAccessor1612.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
有沒有辦法得到正確的模式推斷,或指定正確的模式schema = StructType([StructField('modified_issn', IntegerType()), StructField('codes', StringType())])?我嘗試了明顯的假設選項,例如inferSchema='true', schema=schema, sqliteSchema=schema,jdbcSchema=schema但都沒有奏效。請注意,我不是管理員,無法以任何方式重新配置系統。
MWE,重現步驟:
匯入以下代碼并運行
spawn_and_test_cache('test.sqlite')以創建資料庫。運行我之前提供的命令,替換適當的路徑。
import sqlite3
def convert_issn(issn):
if issn[-1] != 'X':
return int(issn)
else:
return 1000000000 int(issn.replace('X', '0'))
def read_cache(isxn, filename='.sdxml.sqlite', create=False, mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING)'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.001)
except:
break
curs = c.cursor()
curs.execute('SELECT codes FROM %s WHERE modified_%s = %d'
% (mode, mode, modified_isxn))
row = curs.fetchone()
if row is None:
c.close()
return None
c.close()
return row[0]
def write_cache(isxn, codes, filename='.sdxml.sqlite', create=False,
mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING);'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
break
sql = ('INSERT OR REPLACE INTO %s values (%d, "%s")'
% (mode, modified_isxn, codes))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
c.close()
return False
c.close()
return True
def spawn_and_test_cache(filename='.sdxml.sqlite'):
write_cache('10000000000000', 'abc', filename=filename, create=True)
write_cache('10000000000000', 'def', filename=filename, create=True,
mode='isbn')
return (read_cache('10000000000000', filename=filename),
read_cache('10000000000000', filename=filename, mode='isbn'))```
uj5u.com熱心網友回復:
對于 SQLite 表的定義,使用TEXT(而不是不受支持的 STRING)-
你很好:-)

(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='sqlite_master',url='jdbc:sqlite:sdxml.sqlite').load()
.show(truncate=False)
)
----- ---- -------- -------- ----------------------------------------------------------------
|type |name|tbl_name|rootpage|sql |
----- ---- -------- -------- ----------------------------------------------------------------
|table|issn|issn |2 |CREATE TABLE issn(modified_issn INTEGER PRIMARY KEY, codes TEXT)|
|table|isbn|isbn |3 |CREATE TABLE isbn(modified_isbn INTEGER PRIMARY KEY, codes TEXT)|
----- ---- -------- -------- ----------------------------------------------------------------
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='issn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
-------------- -----
| modified_issn|codes|
-------------- -----
|10000000000000| abc|
-------------- -----
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='isbn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
-------------- -----
| modified_isbn|codes|
-------------- -----
|10000000000000| def|
-------------- -----
P.S.
customSchema does control the schema, however it does not prevent the error for unknown type at the source.
I'm guessing that the JDBC driver has mapping table for the data types and once it failed to find a data type of the source, it throws an exception.
driver = 'org.sqlite.JDBC'
query = 'select modified_issn from issn'
url = 'jdbc:sqlite:sdxml.sqlite'
data_types = ['tinyint','smallint','int','bigint','string']
for dt in data_types:
spark.read.format('jdbc').options(customSchema = f'modified_issn {dt}' ,driver=driver ,query=query ,url=url).load().printSchema()
root
|-- modified_issn: byte (nullable = true)
root
|-- modified_issn: short (nullable = true)
root
|-- modified_issn: integer (nullable = true)
root
|-- modified_issn: long (nullable = true)
root
|-- modified_issn: string (nullable = true)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/447110.html
