Oracle
 sql >> Base de données >  >> RDS >> Oracle

Pourquoi Kafka jdbc connect insère des données en tant que BLOB au lieu de varchar

Ressemble à la String de Kafka est mappé au NCLOB d'Oracle :

<table border="1">
<tr>
<th>Schema Type</th><th>MySQL</th><th>Oracle</th><th>PostgreSQL</th><th>SQLite</th>
</tr>
<tr>
<td>INT8</td><td>TINYINT</td><td>NUMBER(3,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT16</td><td>SMALLINT</td><td>NUMBER(5,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT32</td><td>INT</td><td>NUMBER(10,0)</td><td>INT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT64</td><td>BIGINT</td><td>NUMBER(19,0)</td><td>BIGINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>FLOAT32</td><td>FLOAT</td><td>BINARY_FLOAT</td><td>REAL</td><td>REAL</td>
</tr>
<tr>
<td>FLOAT64</td><td>DOUBLE</td><td>BINARY_DOUBLE</td><td>DOUBLE PRECISION</td><td>REAL</td>
</tr>
<tr>
<td>BOOLEAN</td><td>TINYINT</td><td>NUMBER(1,0)</td><td>BOOLEAN</td><td>NUMERIC</td>
</tr>
<tr>
<td>STRING</td><td>VARCHAR(256)</td><td>NCLOB</td><td>TEXT</td><td>TEXT</td>
</tr>
<tr>
<td>BYTES</td><td>VARBINARY(1024)</td><td>BLOB</td><td>BYTEA</td><td>BLOB</td>
</tr>
<tr>
<td>'Decimal'</td><td>DECIMAL(65,s)</td><td>NUMBER(*,s)</td><td>DECIMAL</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Date'</td><td>DATE</td><td>DATE</td><td>DATE</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Time'</td><td>TIME(3)</td><td>DATE</td><td>TIME</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Timestamp'</td><td>TIMESTAMP(3)</td><td>TIMESTAMP</td><td>TIMESTAMP</td><td>NUMERIC</td>
</tr>
</table>

Source :https://www.ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/com.ibm.swg.im.infosphere.biginsights.admin.doc/doc/admin_kafka_jdbc_sink.html

https://docs.confluent.io/current/connect /connect-jdbc/docs/sink_connector.html

MISE À JOUR

OracleDialect classe (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/dialect/OracleDialect.java ) a CLOB codé en dur valeur et étendez-la simplement avec votre propre classe et modifiez ce mappage n'aidera pas car le type de dialecte est défini dans la méthode statique dans JdbcSinkTask (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java )

final DbDialect dbDialect = DbDialect.fromConnectionString(config.connectionUrl);