Kiat Produksi Aliran Udara — Penyelesaian Tugas yang Tepat (Bukan DAG) – Menuju AI

Kiat Produksi Aliran Udara — Penyelesaian Tugas yang Tepat (Bukan DAG) – Menuju AI

Pengarang: Guilherme Banhudo

Awalnya diterbitkan di Towards AI the World’s Leading AI and Technology News and Media Company. Jika Anda sedang membangun produk atau layanan terkait AI, kami mengundang Anda untuk mempertimbangkan untuk menjadi sponsor AI. Di Towards AI, kami membantu menskalakan AI dan startup teknologi. Biarkan kami membantu Anda melepaskan teknologi Anda kepada massa.

Kiat Produksi Aliran Udara — Penyelesaian Tugas yang Tepat (Bukan DAG)

Foto oleh Jackson Simmer di Unsplash

Apache Airflow telah menjadi standar de facto untuk Orkestrasi Data. Namun, selama bertahun-tahun dan versi, itu mengumpulkan serangkaian nuansa dan bug yang dapat menghambat penggunaan produksinya.

Rangkaian artikel ini bertujuan untuk memandu pengguna Apache Airflow melalui proses mengatasi masalah ini, masalah yang sama yang saya hadapi.

Catatan: Seperti biasa, semua kode tersedia di repositori GitHub saya, di sini.

Tugas yang Tepat (Bukan DAG) catchup

TLDR: Kemampuan Airflow untuk mendapatkan tanggal TaskInstance sebelumnya yang berhasil tidak berfungsi sebagaimana mestinya, sebaliknya mengembalikan tanggal run DAG yang berhasil sebelumnya, yang mencegah Anda mengambil informasi yang hilang/gagal secara akurat dan benar. Dalam posting ini, Anda akan mengetahui cara memotongnya. Lihat laporan bug asli.

TLDR #2: Klik di sini untuk langsung ke solusi

Pernyataan masalah

Salah satu fitur yang paling menarik dan bermanfaat dari Apache Airflow adalah kemampuannya untuk mengejar masa lalu jika tugas gagal. Tidak, saya tidak mengacu pada parameter catchup yang dapat Anda tentukan di DAG Anda, melainkan memberi Tasks kemampuan untuk mencakup TaskInstances yang sebelumnya gagal (dan tanggal eksekusinya) menggunakan salah satu dari

Template JINJA, atau langsung mengakses TaskInstance sebelumnya yang statusnya sukses:

https://medium.com/media/6712aa06940cf80b6b5899f0941a7cdb/href

Ini sangat berguna ketika Anda ingin memastikan data Anda tetap mutakhir di Ingesti atau ETL Anda menggunakan semua informasi untuk TaskInstances saat ini dan yang gagal, misalnya:

https://medium.com/media/0953ef183f32473dc50b371943c359e7/href

Pada contoh yang lebih praktis, dengan mempertimbangkan gambar di bawah, Anda akan mengharapkan bahwa jika tugas pertama transform_data gagal saat menggunakan konfigurasi di atas, data yang gagal/hilang akan diambil pada menjalankan transform_data berikutnya, dalam satu jam berikutnya, benar ?

DAG seperti ETL sederhana yang terdiri dari tiga tugas yang bergantung secara linier

Loop Peristiwa yang Diharapkan:

lingkaran 1
– Jalankan tugas extract_from_db, ambil satu jam data, status: sukses
– Jalankan tugas tranform_data, ubah satu jam data, status: gagal
– Jalankan tugas load_target_db, muat satu jam data, status: gagal Loop 2
– Jalankan tugas extract_from_db, ambil satu jam data, status: sukses
– Jalankan tugas tranform_data, ubah data dua jam (saat ini dan sebelumnya gagal), status: sukses
– Jalankan tugas load_target_db, muat dua jam data (saat ini dan sebelumnya gagal), status: sukses

Sayangnya, itu tidak benar. Meskipun ini adalah perilaku yang jelas dan diharapkan, kode warisan Airflow mencegah hal itu terjadi. Lihat laporan bug asli.

Di bawah ini adalah template yang dirender yang sesuai untuk menjalankan tugas transform_data pertama dan kedua, masing-masing:

https://medium.com/media/a5d909553366e31daf47859ce48ff7ce/hrefhttps://medium.com/media/47b352bc376b8b25f6ad021d6c5f4d5a/href

Perhatikan bahwa tugas kedua tidak mengkompensasi kegagalan sebelumnya. Sebagai gantinya, ia hanya melewati parameter tanggal eksekusi sebelumnya, 20221013T010000 alih-alih menyertakan run sukses sebelumnya, catch-up penuh 20221013T000000.

Mengapa ini terjadi?

Perilaku default Apache Airflow, ketika tanggal eksekusi sebelumnya berhasil, adalah melihat tanggal eksekusi sukses DAG sebelumnya, bukan pada TaskInstance, yang secara efektif membuat DAG Anda tidak mampu mengejar secara otomatis kecuali seluruh DAG gagal.

Jadi, apakah kita berharap untuk terjadi? Yah, kami berharap template yang dirender pada proses kedua adalah:

https://medium.com/media/580e070ba983bf9d439e375d50e10f34/href

Anda dapat melihat laporan bug asli tentang masalah tersebut, sejak tahun 2021 dan bahkan lebih lama di StackOverflow.

Solusinya

Seperti banyak kerangka kerja Python lainnya, Apache Airflow menggunakan ORM (Object Relational Mapper) untuk mengabstraksi akses ke database backendnya. Secara khusus, proyek SQL Alchemy yang biasa telah dimanfaatkan untuk mempercepat Apache Airflow. Ini memungkinkan kita untuk mengakses database metadata secara langsung dan memanipulasinya sesuai dengan kebutuhan kita.

Solusi untuk masalah kita kemudian dibagi menjadi empat langkah sederhana:

Ambil objek TaskInstance berdasarkan Status tertentu Ambil instance TaskInstance terakhir yang berhasil untuk Tugas yang diberikan Ambil DAGRun yang terkait dengan instance TaskInstance terakhir yang berhasil untuk Tugas yang disediakan Persenjatai DAG kami dengan kemampuan untuk menggunakan informasi ini!

Terakhir, contoh bagian praktis telah ditambahkan untuk mengilustrasikan tujuannya!

Langkah 1: Ambil objek TaskInstance berdasarkan Status tertentu

Langkah pertama sesuai dengan kemampuan untuk meminta ORM untuk mengambil database untuk mengambil menjalankan TaskInstance terakhir yang sesuai dengan keadaan tertentu:

Catatan: Fungsinya adalah generalisasi yang digunakan untuk memungkinkan Anda mengambil status tertentu yang mungkin Anda perlukan

https://medium.com/media/69163909fc202dcb6b4cd6e9b8d111d2/href

Fungsi ini cukup jelas, menanyakan objek ORM TaskInstance dan memfilternya melalui dua parameter: status pencarian yang diinginkan dan instance TaskInstance.

Langkah 2: Ambil contoh TaskInstance terakhir yang berhasil untuk Tugas yang disediakan

Memanfaatkan generalisasi yang ditentukan sebelumnya, sekarang kita dapat meminta ORM untuk mengambil hanya proses terakhir yang statusnya Sukses.

https://medium.com/media/447f8d9eef5ff7a199bf819156028272/href

Langkah 3: Ambil DAGRun yang terkait dengan instance TaskInstance terakhir yang berhasil untuk Task yang disediakan.

Setelah melacak instance TaskInstance terakhir yang berhasil, kita sekarang dapat mengambil objek DAGRun yang terkait dengan instance TaskInstance tersebut. Model DAGRun berisi informasi mengenai instance TaskInstance yang dijalankan bersama dengan beragam informasi berguna:

https://medium.com/media/9ec125df8a1323739a9e033e470c3e12/href

Berbekal pengetahuan ini, kami sekarang dapat mengambil instance DAGRun yang terkait dengan instance TaskInstance terakhir yang berhasil kami ambil dalam proses yang sama seperti sebelumnya:

https://medium.com/media/f41229e4fba53554130c090faa31156d/href

Langkah 4: Persenjatai DAG kami dengan kemampuan untuk menggunakan informasi ini!

Terakhir, kita harus memastikan Apache Airflow mengetahui fungsi baru kita dan dapat menyuntikkannya ke mesin JINJA.

Kita dapat melakukannya dengan mengimpor fungsi dan meneruskan fungsi yang ditentukan pengguna ke DAG langsung pada konstruktornya, dalam hal ini, melalui manajer konteks:

https://medium.com/media/ed392fc52b84de16a6fbffe107dacd8f/href

Anda sekarang dapat dengan bebas memanggil fungsi secara langsung di JINJA dan meneruskannya sebagai argumen ke Ekstraksi, ETL, atau proses lain yang mungkin Anda miliki!

https://medium.com/media/44892e950bd60b06ba275a11c320dafe/href

Beri tahu saya di komentar jika Anda menemukan sumber daya ini berguna dan seperti biasa, Anda dapat menemukan kode ini di repositori GitHub saya!

Kiat Produksi Aliran Udara — Penangkapan Tugas yang Tepat (Bukan DAG) awalnya diterbitkan di Towards AI on Medium, di mana orang-orang melanjutkan percakapan dengan menyoroti dan menanggapi cerita ini.

Diterbitkan melalui Menuju AI

Author: Scott Anderson