Now, let's examine a specific workflow DAG that you'll be completing as part of your end-to-end lab. Read through the four operators here and take your best guess at what this pipeline is achieving in the short pop-up quiz that'll appear. Now, let's read through it together. Here, you see four tasks, t1, t2, t3, and t4, and the four operators corresponding to four Google Cloud Platform services. The name should look familiar. You can probably even start to guess what this workflow does at a high level, just by reading them in order. The first two are concerned with getting fresh model data from the BigQuery data set and into GCS for consumption by our ML model later on in the workflow. In the lab you're going to work on later, the date set will be one that you're already familiar with. It's the Google Analytics news article sample data set. Let's see the parameters the BigQueryOperator takes. The BigQueryOperator allows you to specify a SQL query to run against a BigQuery data set. In this quick example, we're passing in a query which returns the top 100 most popular Stack Overflow posts from the BigQuery public data set for a specified date range that you see there in the WHERE clause. You notice anything different about the filters in the WHERE clause? Yeah, there are parameters in this case for max date and min date. You can parameterize pieces of this SQL statement, like what we did here, to only return posts for January 2018 with a min query date and a max query date. What's really neat is that you can make the parameters themselves dynamic instead of the static ones that are shown here, and you can have them be based on the DAG schedule date. Like macros.ds_add(ds, -7), which just means this is a week before the DAG is scheduled to run. That's what that date value is going to be. For workflows, you want to be really careful with the window of data you use for training, evaluation, and testing. What if data two weeks ago was fine, but last week the upstream pipeline failed and there were no records returned for the query? Zero records returned for a SQL query is still a valid result, but you can still imagine the implications. You didn't have somehow or someway of validating the upstream data sources that you're feeding into your workflow. This is especially true for recommendation models, which rely on timely, fresh training data to make those good recommendations. And avoid stale, or even worse, incorrect or inappropriate recommendations for your users. So what can you do to guard our workflows against upstream data issues? Two words, health checks. Running SQL statements on BigQuery is not only good at giving you correct insights from clean data, it can also alert you if something is amiss. Let's take a look at an example. There's an operator you can use, called BigQueryCheckOperator, which sounds exactly like something we could use in our workflow. How it works is, say we had an upstream data source that tracked the number of students that went through training classes cumulated for all time, like the one that you're taking now. This is a data table that logically should never have zero records passed as part of our workflow. So we're expecting something more like this. 10,000 records or so are passed into our ML training data set, and the rest of the workflow handles the model retraining and deployment as usual. Let's say for whatever reason or data anomaly, the upstream data table was accidentally offline, or truncated, or moved, and now it's giving us zero records after we do a select star into our training data set. Again, this is still a valid SQL query, but zero records to train and retrain our recommendation model is going to be really, really, really bad when it comes to ML model training. And that's where the BigQueryCheckOperator can help us out. If the result of this health check query returns zero or false, the health check will fail, and then you can have the rest of this workflow fail as well. But even beyond that, you can say you have a sad path branch of your DAG that even sends an email alert or notification for critical pipeline failures to a human team for immediate investigation. I've shown just a sample of, if the upstream data table zero records, then fail to work flow. We can also build a much more complex logic in SQL using your own health checks by using something like case statements or joins against other tables. And the benefit for you and your team is that all these checks are written as part of the DAG in that SQL so they get and controlled and very highly visible. Kind of like unit testing for programming. Now, here's the tricky dilemma. We saw how easy it is to check for zero records with a count star and fail the pipeline. But what about the case where something in the data values themselves changed? We're still getting records passed through, but they're really different or strange values. Take a look at an example. We have a daily workflow that ingest the average temperature of the day. Notice anything funny about October? That value is way lower than the others. Now, if you had to make a guess, what do you think happened? Barring really strange summer weather patterns, our upstream data team likely switched the scale from Fahrenheit to Celsius somewhere on or about October 1st. And didn't tell the downstream engineering or data science teams who are consuming this BigQuery table as part of their workflows. If we use the simple, does the training data have any records, this check would have passed and the model would have been trained, none the wiser. Instead, we can use BigQueryIntervalCheckOperator, which can notice huge changes or swings in the data distribution, and then fail the workflow until further investigation. This operator checks the values of metrics, given as SQL expressions, whether or not they're within a certain distance or tolerance of the ones from our parameter value that you sent, called days back. Which looks backwards at your data history for previous DAG runs to compare it against. Keep in mind, for both BigQueryCheckOperator and BigQueryIntervalCheckOperator, you can have more than one check run as a node in your workflow. A good idea for machine learning is to set up a check for each of your key features to test for null, zero, out of bounds, or irregular values before they make their way into your training data set. And of course, you can always revisit and update these checks as you engineer more features, or continue to tune and train your model. The next two operators handle retraining the model by submitting a job to Cloud Machine Learning Engine, and then deploying the updated model to App Engine via an API endpoint. The ML Engine training operator is how Cloud Composer interacts with Cloud Machine Learning Engine, and thus gives it the ability to periodically schedule new jobs to be sent to CMLE as part of your automated workflow. Take a look through the parameters you provide to the operator. First, we specify an ID for the task we're running and then the project that we want to run it on. For our lab, this will be the same project that's also managing the Airflow instance and everything else that you're working with. Then you have the option of creating your own job ID, which usually involves a concatenated time stamp or a similar unique identifier. After that is the path to the actual model code, which we'll have in a zip file, which is created by a shell script after training is run in our lab. Then the actual Python module name within your code to run within the ML Engine training job after installing the package. In this example, we've called it trainer.task. Next is a series of arguments for training, which we store into an array, called training_args. Those includes the location of the job directory, where the training files are, where the output directory is, and what data we're using. And the next three, region, scale_tier, and master_type are GCP infrastructure parameters where you want your job to be ran and with what special hardware, like GPUs or TPUs, if you're using any. Lastly, once your model is retrained and ready to be delivered as an API endpoint for serving, we need to redeploy our App Engine project with the latest model. Here, the parameters are simple since we're just redeploying our existing App Engine instance associated with our project. And that's it, well, almost, that's the last task. But at the end of most DAG files, you'll find is the actual order in which we want these tasks and operators to be ran. This is what the D in DAG stands for, directed. For example, task two, t2, won't run until t1 is completed. This is what gives our graph dependencies of a workflow. I can probably guess what you're thinking at this point. You could build in some pretty cool branching of multiple child nodes per upstream parent, and absolutely that's totally possible. Just don't forget to comment your code, so you know where one branch begins, where it's going, what tasks are involved, and where it ends. As a tip, after you load in your DAG file into the DAG folder, you can see the visualization of your DAG in the Airflow UI as a directed graph, a Gantt chart, or a list if you wanted. Reviewing the visual representation of the order tasks will help you confirm that your tasks are ordered in the way that you want them.