Saat menggunakan Atur terowongan 2.3.9 untuk menyinkronkan data dari Peramal ke DorisAnda mungkin menghadapi karakter yang kacau – terutama jika database Oracle menggunakan Set karakter ASCII. Tapi jangan panik – artikel ini menuntun Anda Mengapa ini terjadi dan Cara memperbaikinya.
๐ง akar penyebab
Masalah ini berasal dari bagaimana Seatunnel membaca data dari Oracle. Jika Oracle menggunakan set karakter seperti ASCII, dan Anda menyinkronkan ke Doris (yang mengharapkan UTF-8 yang tepat atau penyandian yang kompatibel), karakter Cina dapat menjadi tidak dapat dibaca.
Kuncinya adalah mencegat dan menilai ulang data saat dibaca dari oracle ResultSet
.
๐ Memahami Aliran Membaca Seatunnel
Mari kita lihat internal seatunnel yang menangani konsumsi data JDBC:
1. JdbcSourceFactory
Kelas ini:
- Memuat konfigurasi sumber Anda.
- Konstruksi
JdbcSourceConfig
DanJdbcDialect
. - Menciptakan a
JdbcSource
contoh.
2. JdbcSource
Ini:
- Menginisialisasi a
SourceSplitEnumerator
untuk membagi tugas. - Menciptakan a
JdbcSourceReader
untuk mengeksekusi mereka.
3. JdbcSourceReader
Bertanggung jawab atas:
- Membangun
JdbcInputFormat
. - Berulang kali memanggil
pollNext()
Metode untuk mengambil data.
4. pollNext()
Metode
Metode ini:
- Panggilan
open()
di dalamJdbcInputFormat
untuk mempersiapkanPreparedStatement
DanResultSet
. - Lalu menelepon
nextRecord()
untuk memprosesResultSet
dan mengubahnya menjadi aSeaTunnelRow
.
5. nextRecord()
dan masalah penyandian
Di dalam JdbcInputFormat
:
- Itu
nextRecord()
Panggilan metodetoInternal()
di dalamJdbcRowConverter
. - Implementasi default menggunakan
JdbcFieldTypeUtils.getString(rs, resultSetIndex)
.
๐ฅ Masalah: Jika hasilnya berisi karakter Cina yang disimpan sebagai ASCII, metode ini mengembalikan teks yang kacau.
โ Strategi Solusi
Kita perlu mendeteksi penyandian sumber dan menyandikan ulang data Saat ini diambil dari hasil.
Inilah cara melakukannya:
๐ Langkah Implementasi
Langkah 1: Tambahkan parameter charset
Di dalam JdbcInputFormat
menambahkan:
private final Map params;
Di konstruktor:
public JdbcInputFormat(JdbcSourceConfig config, Map tables) {
this.jdbcDialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
this.chunkSplitter = ChunkSplitter.create(config);
this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.tables = tables;
this.params = config.getJdbcConnectionConfig().getProperties(); //
Langkah 2: Lulus params
ke konverter baris
Di nextRecord()
metode JdbcInputFormat
perbarui panggilan metode ke:
SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitTableSchema, params);
Langkah 3: Tambahkan metode pengkodean
Di dalam AbstractJdbcRowConverter
mendefinisikan:
public static String convertCharset(byte[] value, String charSet) {
if (value == null || value.length == 0) {
return null;
}
log.info("Value bytes: {}", Arrays.toString(value));
try {
return new String(value, charSet);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
Langkah 4: Ubah toInternal()
untuk tipe string
Di dalam AbstractJdbcRowConverter
perbarui STRING
Ketik penanganan seperti itu:
case STRING:
if (params == null || params.isEmpty()) {
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
} else {
String sourceCharset = params.get("sourceCharset");
if ("GBK".equalsIgnoreCase(sourceCharset)) {
fields[fieldIndex] = convertCharset(JdbcFieldTypeUtils.getBytes(rs, resultSetIndex), sourceCharset);
} else {
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
}
}
break;
Langkah 5: Membangun kembali dan menggunakan
Setelah membuat perubahan di atas:
- Membangun kembali
connector-jdbc
modul. - Ganti yang ada
connector-jdbc-2.3.9.jar
di bawah seatunnelconnectors
direktori. - Mulai ulang cluster seatunnel.
๐งพ Tips Konfigurasi
- Jika database Oracle Anda tidak memiliki masalah penyandianAnda tidak perlu melewati
sourceCharset
milik. - Jika perlu, lewati seperti ini di konfigurasi Anda:
sourceCharset=GBK
- Untuk debug logging dari
connector-jdbc
periksa Log Pekerja di seatunnellogs
direktori.
โ Ringkasan
Dengan menambahkan mekanisme switching charset sederhana dan mengubah implementasi sumber JDBC, Anda dapat menghilangkan karakter yang kacau saat menyinkronkan data Oracle ke Doris menggunakan Seatunnel.
Tidak ada lagi karakter yang rusak – pipa data Anda menjadi lebih pintar. ๐