Importing COVID-19 data into Elasticsearch
UPDATE
Due to a change in the way that John Hopkins Hospital organizes the COVID-19 related data the pipeline together with the index template have been changed. The below configurations for both the pipeline and template have been updated to work properly when importing COVID-19 data into elasticsearch. These changes should not impact the dashboard.
We launched an interactive Kibana dashboard for the COVID-19 outbreak. You can use the dashboard for filtering and comparing COVID-19 data from different countries, regions and counties enabling you to run your own analysis of the outbreak.
The outbreak of COVID-19 virus has had a huge impact on both personal lives as well as business activities and in light of this, access to data is essential in allowing everyone to make informed decisions regarding day-to-day activities. John Hopkins Hospital, which is deeply involved in the international response to the virus, is collecting and publishing the data regarding the outbreak through GitHub as well as daily reports and a live dashboard.
In this blog post we will provide you with the details needed to create a logstash pipeline that brings the data collected by John Hopkins Hospital into your own Elasticsearch cluster. We also added the details for this process in a GitHub repository that can be found here.
If you are interested to see how Siscale can help you during these uncertain times check out this page. For more information on ingesting COVID-19 data read on!
Logstash
Below is the Logstash code required to collect, transform the data into time series data and import it into Elasticsearch. The code contains commented lines which explain what each part of the code does.
-- CODE markdown--
input {
#Use the generator input to update the dataset when the pipeline is first started
generator {
lines => [ "first-time-run" ]
count => 1
tags => "check_github"
}
#The http_poller input plugin is used to schedule checks for dataset updates
#The "schedule" setting is defined to check for updates every new hour (at minute 0)
http_poller {
urls => {
check_github => "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports"
}
tags => "check_github"
request_timeout => 60
schedule => { "cron" => "0 * * * * UTC" }
codec => "plain"
metadata_target => "http_poller_metadata"
}
}
filter {
#The pipeline will treat two types of events:
#The first type is the initial event that triggers the downloading, parsing, and transforming of the CSVs into time series data
#The second type is the time series data itself
#This 'if' discriminates between the two types. The time series data is treated later
if "check_github" in [tags] {
ruby {
init => '
require "csv"
require "open-uri"
require "digest"
require "json"
#Path to store the MD5 hashes of the downloaded CSV files
#This is used to keep track of any changes to the Github repo and optimize the update process
@github_stored_hashes_path = "/etc/logstash/covid-19-hashes.json"
#Base URL for downloading the time series CSVs
@github_daily_reports_base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/"
@states = { "AL" => "Alabama", "AK" => "Alaska", "AS" => "American Samoa", "AZ" => "Arizona", "AR" => "Arkansas", "CA" => "California", "CO" => "Colorado", "CT" => "Connecticut", "DE" => "Delaware", "DC" => "District of Columbia", "FM" => "Federated States of Micronesia", "FL" => "Florida", "GA" => "Georgia", "GU" => "Guam", "HI" => "Hawaii", "ID" => "Idaho", "IL" => "Illinois", "IN" => "Indiana", "IA" => "Iowa", "KS" => "Kansas", "KY" => "Kentucky", "LA" => "Louisiana", "ME" => "Maine", "MH" => "Marshall Islands", "MD" => "Maryland", "MA" => "Massachusetts", "MI" => "Michigan", "MN" => "Minnesota", "MS" => "Mississippi", "MO" => "Missouri", "MT" => "Montana", "NE" => "Nebraska", "NV" => "Nevada", "NH" => "New Hampshire", "NJ" => "New Jersey", "NM" => "New Mexico", "NY" => "New York", "NC" => "North Carolina", "ND" => "North Dakota", "MP" => "Northern Mariana Islands", "OH" => "Ohio", "OK" => "Oklahoma", "OR" => "Oregon", "PW" => "Palau", "PA" => "Pennsylvania", "PR" => "Puerto Rico", "RI" => "Rhode Island", "SC" => "South Carolina", "SD" => "South Dakota", "TN" => "Tennessee", "TX" => "Texas", "UT" => "Utah", "VT" => "Vermont", "VI" => "Virgin Islands", "VA" => "Virginia", "WA" => "Washington", "WV" => "West Virginia", "WI" => "Wisconsin", "WY" => "Wyoming" }
#Input: a CSV and the date of the CSV
#Output: an array of Logstash events extracted from the CSV that will be sent to Elasticsearch
def create_events(csv, current_date)
totals_by_country = Hash.new
totals_by_country["WORLD"] = LogStash::Event.new
totals_by_country["WORLD"].set("total_confirmed", 0)
totals_by_country["WORLD"].set("total_recovered", 0)
totals_by_country["WORLD"].set("total_deaths", 0)
totals_by_country["WORLD"].tag("total_world")
totals_by_country["WORLD"].tag("covid_time_series")
totals_by_country["WORLD"].set("timestamp", current_date.iso8601(3).to_s)
totals_by_country["WORLD"].set("unique_id", "totals_for_world_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
all_country_region = Array.new
all_province_state = Array.new
all_county = Array.new
all_country_region.append("WORLD")
events_array = Array.new
#For each event..
csv.each do |row|
#..create a new Logstash event that will store the data
new_event = LogStash::Event.new
#..add current row data to the current event
row.each do |k, v|
#new_event[k] = v
new_event.set(k, v&.strip)
end
#..store the date of the reporting
new_event.set("timestamp", current_date.iso8601(3).to_s)
#..store the date when Logstash processed the event
new_event.set("last_process", Time.now.iso8601(3).to_s)
#..some early events contain empty fields when the reported numbers are 0. Fix this here
new_event.set("confirmed", 0) if new_event.get("confirmed").nil?
new_event.set("recovered", 0) if new_event.get("recovered").nil?
new_event.set("deaths", 0) if new_event.get("deaths").nil?
#..calculate the number of active cases, if the field is not already populated
active = new_event.get("confirmed").to_i - new_event.get("recovered").to_i - new_event.get("deaths").to_i
new_event.set("active", active) if active >= 0
#..calculate ratios and store them
ratio_deaths_to_confirmed = new_event.get("deaths").to_f / new_event.get("confirmed").to_f
ratio_recovered_to_confirmed = new_event.get("recovered").to_f / new_event.get("confirmed").to_f
ratio_deaths_to_recovered = new_event.get("deaths").to_f / new_event.get("recovered").to_f
new_event.set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
new_event.set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
new_event.set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
if new_event.get("province_state")&.match?(/(.*?),\s(\w{2})/)
matched = new_event.get("province_state").match(/(.*?),\s(\w{2})/)
@states[matched[2]].nil? ? new_event.set("province_state", matched[2]) : new_event.set("province_state", @states[matched[2]])
new_event.set("admin2", matched[1].gsub(/ [Cc]ounty/, ""))
new_event.tag("added_county")
end
#..generate a unique key for the current event
key = ((new_event.get("admin2") || "").downcase + "_" + (new_event.get("province_state") || "").downcase + "_" + (new_event.get("country_region") || "").downcase + "_" + (current_date.strftime("%m-%d-%Y") || "").to_s).gsub(/[^\w\d]/, "_")
new_event.set("unique_id", key)
#..add a tag to identify the event later
new_event.tag("covid_time_series")
events_array.append(new_event)
if ["Channel Islands", "Cayman Islands"].include?(new_event.get("country_region"))
new_event.set("province_state", new_event.get("country_region"))
new_event.set("country_region", "United Kingdom")
end
if ["Saint Martin", "St. Martin"].include?(new_event.get("country_region"))
new_event.set("province_state", "St Martin")
new_event.set("country_region", "France")
end
country_normalization = {
"Mainland China" => "China",
"South Korea" => "Korea, South",
"Republic of Korea" => "Korea, South",
"Iran \(Islamic Republic of\)" => "Iran",
"Bahamas, The" => "Bahamas",
"Gambia, The" => "Gambia",
"Hong Kong SAR" => "China",
"Hong Kong" => "China",
"Macao SAR" => "China",
"Macau" => "China",
"Republic of Ireland" => "Ireland",
"Republic of Moldova" => "Moldova",
"Republic of the Congo" => "Congo (Brazzaville)",
"Russian Federation" => "Russia",
"Taipei and environs" => "Taiwan",
"Taiwan\*" => "Taiwan",
"UK" => "United Kingdom",
"Vatican City" => "Holy See",
"Viet Nam" => "Vietnam",
"occupied Palestinian territory" => "West Bank and Gaza",
"Ivory Coast" => "Cote d\'Ivoire",
"East Timor" => "Timor-Leste",
"Czech Republic" => "Czechia",
"The Bahamas" => "Bahamas",
"The Gambia" => "Gambia",
"Cape Verde" => "Cabo Verde",
"North Ireland" => "Ireland",
"Palestine" => "West Bank and Gaza",
}
new_event.set("country_region", new_event.get("country_region").gsub(/.*/, country_normalization)) unless country_normalization[new_event.get("country_region")].nil?
if totals_by_country[new_event.get("country_region")].nil?
totals_by_country[new_event.get("country_region")] = LogStash::Event.new
totals_by_country[new_event.get("country_region")].set("confirmed", 0)
totals_by_country[new_event.get("country_region")].set("recovered", 0)
totals_by_country[new_event.get("country_region")].set("deaths", 0)
totals_by_country[new_event.get("country_region")].set("country_region", new_event.get("country_region"))
totals_by_country[new_event.get("country_region")].set("timestamp", new_event.get("timestamp"))
totals_by_country[new_event.get("country_region")].tag("total_by_country_region")
totals_by_country[new_event.get("country_region")].tag("covid_time_series")
totals_by_country[new_event.get("country_region")].set("unique_id", "totals_for_" + new_event.get("country_region").downcase + "_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
end
totals_by_country[new_event.get("country_region")].set("confirmed", new_event.get("confirmed").to_i + totals_by_country[new_event.get("country_region")].get("confirmed").to_i)
totals_by_country[new_event.get("country_region")].set("deaths", new_event.get("deaths").to_i + totals_by_country[new_event.get("country_region")].get("deaths").to_i)
totals_by_country[new_event.get("country_region")].set("recovered", new_event.get("recovered").to_i + totals_by_country[new_event.get("country_region")].get("recovered").to_i)
ratio_deaths_to_confirmed = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
ratio_recovered_to_confirmed = totals_by_country[new_event.get("country_region")].get("recovered").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
ratio_deaths_to_recovered = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("recovered").to_f
totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
totals_by_country[new_event.get("country_region")].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
totals_by_country["WORLD"].set("total_confirmed", totals_by_country["WORLD"].get("total_confirmed").to_i + new_event.get("confirmed").to_i)
totals_by_country["WORLD"].set("total_recovered", totals_by_country["WORLD"].get("total_recovered").to_i + new_event.get("recovered").to_i)
totals_by_country["WORLD"].set("total_deaths", totals_by_country["WORLD"].get("total_deaths").to_i + new_event.get("deaths").to_i)
all_country_region.append(new_event.get("country_region")) if !new_event.get("country_region").nil? and !all_country_region.include?(new_event.get("country_region"))
all_province_state.append(new_event.get("province_state")) if !new_event.get("province_state").nil? and !all_province_state.include?(new_event.get("province_state"))
all_county.append(new_event.get("admin2")) if !new_event.get("admin2").nil? and !all_county.include?(new_event.get("admin2"))
end
ratio_deaths_to_confirmed = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
ratio_recovered_to_confirmed = totals_by_country["WORLD"].get("total_recovered").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
ratio_deaths_to_recovered = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_recovered").to_f
totals_by_country["WORLD"].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
totals_by_country["WORLD"].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
totals_by_country["WORLD"].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
totals_by_country["WORLD"].set("country_region", all_country_region)
totals_by_country["WORLD"].set("province_state", all_province_state)
totals_by_country["WORLD"].set("admin2", all_county)
totals_by_country.each do |k, v|
events_array.append(v)
end
return events_array
end
'
code => '
#First day in the dataset
github_daily_reports_first_day = Time.gm(2020, 01, 22, 12, 00)
#Current day
github_daily_reports_last_day = Time.now
current_date = github_daily_reports_first_day
#If the CSV hashes are stored in a file, parse them..
if File.file?(@github_stored_hashes_path)
stored_hashes = JSON.parse(File.read(@github_stored_hashes_path))
#..else, create a new hash
else
stored_hashes = Hash.new
end
#For each day since the beggining of the dataset, until the current day..
while current_date <= github_daily_reports_last_day="" begin="" #..generate="" the="" url="" based="" on="" current_day="" current_url="@github_daily_reports_base_url" +="" current_date.strftime("%m-%d-%y")="" ".csv"="" #..parse="" csv="" while="" normalizing="" headers="" #(e.g.="" until="" 21.03.2020,="" country="" is="" stored="" under="" "country="" region".="" after="" this="" date,="" it="" "country_region"="" #we="" normalize="" such="" that="" both="" variations="" are="" ultimately="" "country_region".="" applies="" to="" other="" fields="" as="" well)="" current_csv="CSV.parse(open(current_url).read.gsub(" \ufeff","="" ""),="" headers:="" true,="" header_converters:="" lambda="" {="" |h|="" h.downcase.gsub(="" [^\w]="" ,="" "_")="" })="" hash="" current_md5="Digest::SHA256.hexdigest(current_csv.to_s)" rescue="" #the="" pipeline="" will="" usually="" fail="" fetch="" data="" for="" current="" day,="" if="" was="" not="" already="" published="" logger.warn?="" and="" logger.warn("failed="" date="" "="" current_date.strftime("%m-%d-%y"))="" else="" #..if="" different="" than="" one,="" process="" events="" stored_hashes[current_url]="" !="current_md5" events_array="create_events(current_csv," current_date)="" #..store="" new="" #..and="" send="" each="" event="" back="" through="" events_array.each="" do="" |event|="" new_event_block.call(event)="" end="" #check="" next="" day="" current_date="" #attempt="" write="" hashes="" disk="" file.write(@github_stored_hashes_path,="" stored_hashes.to_json)="" logger.info?="" logger.info("succesfuly="" updated="" cache="" at="" #{@github_stored_hashes_path}")="" standarderror=""> ex</=>
logger.error? and logger.error("(#{ex.class} - #{ex.message}) Failed to write hashes to file #{@github_stored_hashes_path}. Does Logstash have read/write access to this location?")
ensure
#..and cancel this event. We are only interested in the time series data, which was already sent back through the pipeline.
event.cancel
end
'
}#end ruby
}#end if
#Each time series data event will be sent back through the pipeline.
#This 'if' discriminates between the original event that triggered the downloading and processing of the CSV, and the time series data
if "covid_time_series" in [tags] {
#Parse date fields
date {
#When the github dataset was last updated by the maintainers
match => [ "last_update", "yyyy-MM-dd HH:mm:ss", "M/d/yy HH:mm", "M/d/yyyy HH:mm", "ISO8601" ]
target => "last_update"
}#end date
date {
#When the event was processed by Logstash
match => ["last_process", "ISO8601" ]
target => "last_process"
}#end date
date {
#The true date of the report
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}#end date
if "total_by_country_region" not in [tags] and "total_world" not in [tags] {
mutate {
#Normalize the location field
rename => {
"lat" => "[location][lat]"
"latitude" => "[location][lat]"
"lon" => "[location][lon]"
"long" => "[location][lon]"
"long_" => "[location][lon]"
"longitude" => "[location][lon]"
}
}
ruby {
code => '
@state_coordinates = { "Wisconsin" => { "lat" => "44.5", "lon" => "-89.5" }, "West Virginia" => { "lat" => "39", "lon" => "-80.5" }, "Vermont" => { "lat" => "44", "lon" => "-72.699997" }, "Texas" => { "lat" => "31", "lon" => "-100" }, "South Dakota" => { "lat" => "44.5", "lon" => "-100" }, "Rhode Island" => { "lat" => "41.700001", "lon" => "-71.5" }, "Oregon" => { "lat" => "44", "lon" => "-120.5" }, "New York" => { "lat" => "43", "lon" => "-75" }, "New Hampshire" => { "lat" => "44", "lon" => "-71.5" }, "Nebraska" => { "lat" => "41.5", "lon" => "-100" }, "Kansas" => { "lat" => "38.5", "lon" => "-98" }, "Mississippi" => { "lat" => "33", "lon" => "-90" }, "Illinois" => { "lat" => "40", "lon" => "-89" }, "Delaware" => { "lat" => "39", "lon" => "-75.5" }, "Connecticut" => { "lat" => "41.599998", "lon" => "-72.699997" }, "Arkansas" => { "lat" => "34.799999", "lon" => "-92.199997" }, "Indiana" => { "lat" => "40.273502", "lon" => "-86.126976" }, "Missouri" => { "lat" => "38.573936", "lon" => "-92.60376" }, "Florida" => { "lat" => "27.994402", "lon" => "-81.760254" }, "Nevada" => { "lat" => "39.876019", "lon" => "-117.224121" }, "Maine" => { "lat" => "45.367584", "lon" => "-68.972168" }, "Michigan" => { "lat" => "44.182205", "lon" => "-84.506836" }, "Georgia" => { "lat" => "33.247875", "lon" => "-83.441162" }, "Hawaii" => { "lat" => "19.741755", "lon" => "-155.844437" }, "Alaska" => { "lat" => "66.160507", "lon" => "-153.369141" }, "Tennessee" => { "lat" => "35.860119", "lon" => "-86.660156" }, "Virginia" => { "lat" => "37.926868", "lon" => "-78.024902" }, "New Jersey" => { "lat" => "39.833851", "lon" => "-74.871826" }, "Kentucky" => { "lat" => "37.839333", "lon" => "-84.27002" }, "North Dakota" => { "lat" => "47.650589", "lon" => "-100.437012" }, "Minnesota" => { "lat" => "46.39241", "lon" => "-94.63623" }, "Oklahoma" => { "lat" => "36.084621", "lon" => "-96.921387" }, "Montana" => { "lat" => "46.96526", "lon" => "-109.533691" }, "Washington" => { "lat" => "47.751076", "lon" => "-120.740135" }, "Utah" => { "lat" => "39.41922", "lon" => "-111.950684" }, "Colorado" => { "lat" => "39.113014", "lon" => "-105.358887" }, "Ohio" => { "lat" => "40.367474", "lon" => "-82.996216" }, "Alabama" => { "lat" => "32.31823", "lon" => "-86.902298" }, "Iowa" => { "lat" => "42.032974", "lon" => "-93.581543" }, "New Mexico" => { "lat" => "34.307144", "lon" => "-106.018066" }, "South Carolina" => { "lat" => "33.836082", "lon" => "-81.163727" }, "Pennsylvania" => { "lat" => "41.203323", "lon" => "-77.194527" }, "Arizona" => { "lat" => "34.048927", "lon" => "-111.093735" }, "Maryland" => { "lat" => "39.045753", "lon" => "-76.641273" }, "Massachusetts" => { "lat" => "42.407211", "lon" => "-71.382439" }, "California" => { "lat" => "36.778259", "lon" => "-119.417931" }, "Idaho" => { "lat" => "44.068203", "lon" => "-114.742043" }, "Wyoming" => { "lat" => "43.07597", "lon" => "-107.290283" }, "North Carolina" => { "lat" => "35.782169", "lon" => "-80.793457" }, "Louisiana" => { "lat" => "30.39183", "lon" => "-92.329102" } }
if (event.get("[location][lat]").nil? || event.get("[location][lon]").nil?) || (event.get("[location][lat]").to_f == 0 && event.get("[location][lon]").to_f == 0)
if (event.get("[country_region]") == "US") and (!@state_coordinates[event.get("[province_state]")].nil?)
event.set("[location][lat]", @state_coordinates[event.get("province_state")]["lat"])
event.set("[location][lon]", @state_coordinates[event.get("province_state")]["lon"])
else
event.set("[location][lat]", 0)
event.set("[location][lon]", 0)
end
end
'
}
}
#Rename the fields if you want to correlate with other datasets
mutate {
rename => {
"province_state" => "province_state"
"country_region" => "country_region"
"admin2" => "county"
"fips" => "fips"
"confirmed" => "confirmed"
"deaths" => "deaths"
"recovered" => "recovered"
"active" => "active"
"[location][lat]" => "[location][lat]"
"[location][lon]" => "[location][lon]"
"last_update" => "last_update"
"last_process" => "last_process"
"combined_key" => "combined_key"
"ratio_deaths_to_confirmed" => "ratio_deaths_to_confirmed"
"ratio_recovered_to_confirmed" => "ratio_recovered_to_confirmed"
"ratio_deaths_to_recovered" => "ratio_deaths_to_recovered"
"unique_id" => "unique_id"
}
}#end mutate
}#end if
}#end filter
output {
#Send the data to Elasticsearch
elasticsearch {
#Add your Elasticsearch hosts
hosts => [""]
#Add the index name
index => "covid-19-live-update"
#Add Elasticsearch credentials
user => ""
password => ""
#Add SSL details
#ssl => true
#cacert => "/path/to/cert/"
#Update documents based on the unique id that we defined
action => "update"
document_id => "%{unique_id}"
#..or create a new document if it doesn't already exist
doc_as_upsert => true
manage_template => false
}
}
Elasticsearch index
In addition to the Logstash code you will also require a template for the index in elasticsearch. The field mapping can be found below (feel free to change the name of the index but remember to change it also in the Logstash code):
-- CODE markdown--
PUT _template/covid-19-live-update
{
"order" : 0,
"index_patterns" : [
"covid-19-live-update"
],
"settings" : { },
"mappings" : {
"properties" : {
"recovered" : {
"type" : "long"
},
"updated_at" : {
"type" : "date"
},
"ratio_deaths_to_recovered" : {
"type" : "float"
},
"ratio_recovered_to_confirmed" : {
"type" : "float"
},
"ratio_deaths_to_confirmed" : {
"type" : "float"
},
"location" : {
"type" : "geo_point"
},
"confirmed" : {
"type" : "long"
},
"deaths" : {
"type" : "long"
},
"last_update" : {
"type" : "date"
},
"last_process" : {
"type" : "date"
},
"@timestamp" : {
"type" : "date"
}
}
},
"aliases" : { }
}
Kibana dashboard
In the GitHub repository you will also find the dashboard from this article which you can import into your own Kibana. The dashboard was built using the latest version of the Elastic Stack (7.6.1). In order to import the dashboard you will have to navigate in the Kibana interface to Management > Saved Objects and click import.
The GitHub repository can be found here.
Watcher alerting
Once these steps are done you will have the data into Elasticsearch and available through Kibana.
We are also attaching two Watcher codes (alerting) to this article which will send an email to the configured email address with details regarding the number of infected people, recovered and deceased depending on the region of interest. Because not all countries have information at the province/region level (this is how the data is collected by John Hopkins Hospital) there is one Watcher for regional data and one for country-wide data. In the code you will find the following tag “change this” which should be changed with the desired value.
A few considerations for the alerts:
- Make sure to change just the text within the double quotes without deleting them
- Province and country names should be written with capital letters ( e.g. New York, China etc.)
- Feel free to change the interval value at the top of the code with the desired interval with which you want the alerting to occur (values are in the form of a number followed by m for minute, h for hour, d for days without deleting the double quotes)
- Keep in mind that the data from John Hopkins Hospital is updated once or twice a day from our observations
- The following countries have regional information:
- US
- China
- Australia
- Canada
- France
- United Kingdom
- Denmark
- Cruise Ship
- In order to be able to send emails you need to configure your Elastic Stack accordingly. Details can be found here.
Province/Region Watcher alert
-- CODE markdown--
{
"trigger": {
"schedule": {
"interval": "24h"
}
},
"input": {
"search": {
"request": {
"body": {
"query" : {
"bool" : {
"must" : [
{
"match" : {
"province_state.keyword" : "Change this"
}
}
]
}
},
"size" : 2,
"sort" : {
"@timestamp" : {
"order" : "desc"
}
}
},
"indices": [
"covid-19-live-update"
]
}
}
},
"condition": {
"compare" : {
"ctx.payload.hits.total" : {
"gt" : 1
}
}
},
"transform" : {
"script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
},
"actions": {
"email" : {
"email" : {
"to" : "username@example.org",
"subject" : "COVID-19 Status for {{ctx.payload.province}}, {{ctx.payload.country}}",
"body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})"
}
}
}
}
Country level Watcher alert
-- CODE markdown--
{
"trigger": {
"schedule": {
"interval": "24h"
}
},
"input": {
"search": {
"request": {
"body": {
"query" : {
"bool" : {
"must" : [
{
"match" : {
"country_region.keyword" : "Change this"
}
}
]
}
},
"size" : 2,
"sort" : {
"@timestamp" : {
"order" : "desc"
}
}
},
"indices": [
"covid-19-live-update"
]
}
}
},
"condition": {
"compare" : {
"ctx.payload.hits.total" : {
"gt" : 1
}
}
},
"transform" : {
"script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
},
"actions": {
"email" : {
"email" : {
"to" : "username@example.org",
"subject" : "COVID-19 Status for {{ctx.payload.country}}",
"body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})"
}
}
}
}
In part 2 of this blog post we will also show how you can correlate the Coronavirus data with other data sets. This will help businesses to correlate their operational and business data with the COVID-19 outbreak in order to adjust their activities.
If you require help or assistance with this process please contact us at contact@siscale.com