Memperbaiki teks kacau saat menyinkronkan oracle ke doris dengan seatunnel 2.3.9

Saat menggunakan Atur terowongan 2.3.9 untuk menyinkronkan data dari Peramal ke DorisAnda mungkin menghadapi karakter yang kacau – terutama jika database Oracle menggunakan Set karakter ASCII. Tapi jangan panik – artikel ini menuntun Anda Mengapa ini terjadi dan Cara memperbaikinya.

๐Ÿง  akar penyebab

Masalah ini berasal dari bagaimana Seatunnel membaca data dari Oracle. Jika Oracle menggunakan set karakter seperti ASCII, dan Anda menyinkronkan ke Doris (yang mengharapkan UTF-8 yang tepat atau penyandian yang kompatibel), karakter Cina dapat menjadi tidak dapat dibaca.

Kuncinya adalah mencegat dan menilai ulang data saat dibaca dari oracle ResultSet.

๐Ÿ” Memahami Aliran Membaca Seatunnel

Mari kita lihat internal seatunnel yang menangani konsumsi data JDBC:

1. JdbcSourceFactory

Kelas ini:

  • Memuat konfigurasi sumber Anda.
  • Konstruksi JdbcSourceConfig Dan JdbcDialect.
  • Menciptakan a JdbcSource contoh.

2. JdbcSource

Ini:

  • Menginisialisasi a SourceSplitEnumerator untuk membagi tugas.
  • Menciptakan a JdbcSourceReader untuk mengeksekusi mereka.

3. JdbcSourceReader

Bertanggung jawab atas:

  • Membangun JdbcInputFormat.
  • Berulang kali memanggil pollNext() Metode untuk mengambil data.

4. pollNext() Metode

Metode ini:

  • Panggilan open() di dalam JdbcInputFormat untuk mempersiapkan PreparedStatement Dan ResultSet.
  • Lalu menelepon nextRecord() untuk memproses ResultSet dan mengubahnya menjadi a SeaTunnelRow.

5. nextRecord() dan masalah penyandian

Di dalam JdbcInputFormat:

  • Itu nextRecord() Panggilan metode toInternal() di dalam JdbcRowConverter.
  • Implementasi default menggunakan JdbcFieldTypeUtils.getString(rs, resultSetIndex).

๐Ÿ’ฅ Masalah: Jika hasilnya berisi karakter Cina yang disimpan sebagai ASCII, metode ini mengembalikan teks yang kacau.

โœ… Strategi Solusi

Kita perlu mendeteksi penyandian sumber dan menyandikan ulang data Saat ini diambil dari hasil.

Inilah cara melakukannya:

๐Ÿ›  Langkah Implementasi

Langkah 1: Tambahkan parameter charset

Di dalam JdbcInputFormatmenambahkan:

private final Map params;

Di konstruktor:

public JdbcInputFormat(JdbcSourceConfig config, Map tables) {
    this.jdbcDialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
    this.chunkSplitter = ChunkSplitter.create(config);
    this.jdbcRowConverter = jdbcDialect.getRowConverter();
    this.tables = tables;
    this.params = config.getJdbcConnectionConfig().getProperties(); // 

Langkah 2: Lulus params ke konverter baris

Di nextRecord() metode JdbcInputFormatperbarui panggilan metode ke:

SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitTableSchema, params);

Langkah 3: Tambahkan metode pengkodean

Di dalam AbstractJdbcRowConvertermendefinisikan:

public static String convertCharset(byte[] value, String charSet) {
    if (value == null || value.length == 0) {
        return null;
    }
    log.info("Value bytes: {}", Arrays.toString(value));
    try {
        return new String(value, charSet);
    } catch (UnsupportedEncodingException e) {
        throw new RuntimeException(e);
    }
}

Langkah 4: Ubah toInternal() untuk tipe string

Di dalam AbstractJdbcRowConverterperbarui STRING Ketik penanganan seperti itu:

case STRING:
    if (params == null || params.isEmpty()) {
        fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
    } else {
        String sourceCharset = params.get("sourceCharset");
        if ("GBK".equalsIgnoreCase(sourceCharset)) {
            fields[fieldIndex] = convertCharset(JdbcFieldTypeUtils.getBytes(rs, resultSetIndex), sourceCharset);
        } else {
            fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
        }
    }
    break;

Langkah 5: Membangun kembali dan menggunakan

Setelah membuat perubahan di atas:

  1. Membangun kembali connector-jdbc modul.
  2. Ganti yang ada connector-jdbc-2.3.9.jar di bawah seatunnel connectors direktori.
  3. Mulai ulang cluster seatunnel.

๐Ÿงพ Tips Konfigurasi

  • Jika database Oracle Anda tidak memiliki masalah penyandianAnda tidak perlu melewati sourceCharset milik.
  • Jika perlu, lewati seperti ini di konfigurasi Anda:
sourceCharset=GBK
  • Untuk debug logging dari connector-jdbcperiksa Log Pekerja di seatunnel logs direktori.

โœ… Ringkasan

Dengan menambahkan mekanisme switching charset sederhana dan mengubah implementasi sumber JDBC, Anda dapat menghilangkan karakter yang kacau saat menyinkronkan data Oracle ke Doris menggunakan Seatunnel.

Tidak ada lagi karakter yang rusak – pipa data Anda menjadi lebih pintar. ๐Ÿš€