Ciencia De Datos Para Startups: Data Pipelines - Parte 2

Ben Weber
Apr 14, 2020

Contents Outline

Ciencia De Datos Para Startups: Data Pipelines - Parte 2

Apr 14, 2020 15 minutes read

Una tubería escalable

Siguiendo el hilo anterior sobre data pipeline, en esta ocasión construiremos una tubería de datos que reciba los eventos usando el PuSub de Google como endpoint, y guardaremos los eventos en un lago de datos y una base de datos.

El enfoque presentado aquí guardará los eventos como datos en bruto (raw data), pero también discutiré las formas de transformar los eventos en datos procesados.

La tubería de datos que realiza toda esta funcionalidad es relativamente simple. El pipeline lee los mensajes de PubSub y luego transforma los eventos para su persistencia: la porción de BigQuery del pipeline convierte los mensajes en objetos de TableRow y los transmite directamente a BigQuery, mientras que la porción de AVRO del pipeline agrupa los eventos en ventanas discretas y luego guarda los eventos en Google Storage.

El gráfico de operaciones se muestra en la siguiente figura.


El canal de transmisión desplegado en Google Cloud



Establecer el Environment...

El primer paso para construir una tubería de datos es establecer las dependencias necesarias para compilar y desplegar el proyecto. 

Utilicé las siguientes dependencias para configurar los entornos de la API de rastreo que envía los eventos a la tubería, y la tubería de datos que procesa los eventos.

<!-- Dependencies for the Tracking API ->
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.32.0-beta</version>
  </dependency>
</dependencies>
<!-- Dependencies for the data pipeline ->
<dependency>
  <groupId>com.google.cloud.dataflow</groupId>
  <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
  <version>2.2.0</version>
</dependency>

Usé Eclipse para crear y compilar el código de este tutorial, ya que es de código abierto. Sin embargo, otros IDEs como IntelliJ proporcionan características adicionales para desplegar y monitorear las tareas de DataFlow.

Antes de poder implementar las tareas en Google Cloud, deberá configurar una cuenta de servicio tanto para PubSub como para DataFlow. La configuración de estas credenciales está fuera del alcance de este artículo, y hay más detalles disponibles en la documentación de Google.

Un requisito previo adicional para ejecutar este flujo de datos es la creación de un topic de PubSub en GCP. Definí un topic de eventos en bruto que se utiliza para publicar y consumir los mensajes para la tubería de datos. Puede encontrar más detalles sobre la creación de un topic PubSub aquí.

Para implementar este canal de datos, necesitarás configurar un entorno java con las dependencias maven mencionadas anteriormente, configurar un proyecto de Google Cloud y habilitar la facturación, habilitar la facturación en los servicios de almacenamiento y BigQuery, y crear un tema PubSub para el envío y la recepción de mensajes.

Todos estos servicios gestionados cuestan dinero, pero hay un nivel gratuito que puede utilizarse para crear un prototipo de una tubería de datos.



Sending events from a server to a PubSub topic


Eventos de publicación
Para construir una tubería de datos utilizable, es útil construir APIs que encapsulen los detalles del envío de datos de eventos. 

La clase de API de rastreo proporciona esta funcionalidad, y puede utilizarse para enviar datos de eventos generados al conducto de datos. El código que se muestra a continuación muestra la firma del método para el envío de eventos, y muestra cómo generar datos de muestra.



/** Event Signature for the Tracking API 
public void sendEvent(String eventType, String eventVersion, HashMap<String, String> attributes);
*/
// send a batch of events    
for (int i=0; i<10000; i++) {
// generate event names      
  String eventType = Math.random() < 0.5 ? 
      "Session" : (Math.random() < 0.5 ? "Login" : "MatchStart");
// create attributes to send      
  HashMap<String, String> attributes = new HashMap<String,String>();
  attributes.put("userID", "" + (int)(Math.random()*10000));
  attributes.put("deviceType", Math.random() < 0.5 ? 
      "Android" : (Math.random() < 0.5 ? "iOS" : "Web"));
// send the event      
  tracking.sendEvent(eventType, "V1", attributes);      
}


La API de rastreo establece una conexión con un topic PubSub, pasa los eventos como un formato JSON, e implementa una llamada de retorno para la notificación de fallas en la entrega.

El código utilizado para enviar eventos se proporciona a continuación, y se basa en el ejemplo de PubSub de Google que se proporciona aquí.


// Setup a PubSub connection 
TopicName topicName = TopicName.of(projectID, topicID);
Publisher publisher = Publisher.newBuilder(topicName).build();
// Specify an event to send
String event = {\"eventType\":\"session\",\"eventVersion\":\"1\"}";
// Convert the event to bytes    
ByteString data = ByteString.copyFromUtf8(event.toString());
//schedule a message to be published    
PubsubMessage pubsubMessage = 
  PubsubMessage.newBuilder().setData(data).build();
// publish the message, and add this class as a callback listener
ApiFuture<String> future = publisher.publish(pubsubMessage);    ApiFutures.addCallback(future, this);

El código de arriba permite a las aplicaciones enviar eventos a un tema de PubSub. El siguiente paso es procesar estos eventos en un entorno totalmente gestionado que puede escalar según sea necesario para satisfacer la demanda.

Almacenamiento de eventos

Una de las funciones clave de una tubería de datos es poner los eventos instrumentados a disposición de los equipos de ciencia y análisis de datos para su análisis.

Las fuentes de datos utilizadas como endpoints deben tener una baja latencia y ser capaces de escalar hasta un volumen masivo de eventos.

El conducto de datos definido en este tutorial muestra cómo producir eventos tanto para BigQuery como para un lago de datos que puede utilizarse para dar soporte a un gran número de usuarios de empresas de análisis.


Streaming event data from PubSub to DataFlow


El primer paso en esta tubería de datos es leer los eventos de un topic PubSub y pasar los mensajes ingeridos al proceso de DataFlow. 

DataFlow proporciona un conector PubSub que permite la transmisión de mensajes PubSub a otros componentes del DataFlow. 

El código que se muestra a continuación muestra cómo instanciar el conducto de datos, especificar el modo de transmisión y consumir los mensajes de un topic PubSub específico. 

El resultado de este proceso es una colección de mensajes PubSub que se pueden almacenar para su posterior análisis.


// set up pipeline options    
Options options = PipelineOptionsFactory.fromArgs(args)
  .withValidation().as(Options.class);    
options.setStreaming(true);    
Pipeline pipeline = Pipeline.create(options);
// read game events from PubSub    
PCollection<PubsubMessage> events = pipeline
  .apply(PubsubIO.readMessages().fromTopic(topic));

La primera forma en que queremos almacenar los eventos es en un formato de columnas que puede ser usado para construir un lago de datos.

Aunque este post no muestra cómo utilizar estos archivos en ETLs río abajo, tener un lago de datos es una gran manera de mantener una copia de su conjunto de datos en caso de que necesite hacer cambios en su base de datos.

El lago de datos proporciona una manera de volver a cargar sus datos si es necesario debido a cambios en los esquemas o problemas de ingestión de datos. A continuación se muestra la parte del lago de datos asignada a este proceso.



Batching events to AVRO format and saving to Google Storage

Para la AVRO, no podemos usar un enfoque de transmisión directa. Necesitamos agrupar los eventos en lotes antes de poder guardarlos en archivos planos. 

La forma en que esto se puede lograr en DataFlow es aplicando una función de ventanas que agrupa los eventos en lotes fijos. 

El código que se muestra a continuación aplica transformaciones que convierten los mensajes PubSub en objetos String, agrupa los mensajes en intervalos de 5 minutos y envía los lotes resultantes a archivos AVRO en Google Storage.


// AVRO output portion of the pipeline    
events
.apply("To String", ParDo.of(new DoFn<PubsubMessage, String>() {
  @ProcessElement        
  public void processElement(ProcessContext c) throws Exception {
    String message = new String(c.element().getPayload());
    c.output(message);        
  }      
}))
// Batch events into 5 minute windows      
.apply("Batch Events", Window.<String>into(    
    FixedWindows.of(Duration.standardMinutes(5)))       
  .triggering(AfterWatermark.pastEndOfWindow())     
  .discardingFiredPanes()              
  .withAllowedLateness(Duration.standardMinutes(5)))
// Save the events in ARVO format      
.apply("To AVRO", AvroIO.write(String.class)
  .to("gs://your_gs_bucket/avro/raw-events.avro")
  .withWindowedWrites() 
  .withNumShards(8)
  .withSuffix(".avro"));

Para resumir, el código anterior agrupa los eventos en ventanas de 5 minutos y luego exporta los eventos a archivos AVRO en Google Storage.

El resultado de esta porción de la tubería de datos es una colección de archivos AVRO en el almacenamiento de Google que puede ser usada para construir un lago de datos.

Cada 5 minutos se genera una nueva salida de AVRO, y los ETL pueden analizar los eventos en bruto en esquemas de tablas específicas de eventos procesados. La imagen de abajo muestra una salida de los archivos de AVRO.


AVRO files saved to Google Storage

Además de crear un lago de datos, queremos que los eventos sean accesibles inmediatamente en un entorno de consulta.

DataFlow proporciona un conector BigQuery que sirve esta funcionalidad, y los datos enviados a este endpoint están disponibles para su análisis después de una corta duración.

Esta porción de la tubería de datos se muestra en la siguiente figura.



Streaming events from DataFlow to BigQuery

La tubería de datos convierte los mensajes PubSub en objetos TableRow, que pueden ser insertados directamente en BigQuery. 

El código de abajo consiste en dos métodos de aplicación: una transformación de datos y un escritor IO. 

El paso de transformación lee las cargas útiles de los mensajes de PubSub, analiza el mensaje como un objeto JSON, extrae los atributos eventType y eventVersion, y crea un objeto TableRow con estos atributos además de una marca de tiempo y la carga útil del mensaje. 

El segundo método de aplicación le dice al pipeline que escriba los registros en BigQuery y que añada los eventos a una tabla existente.


// parse the PubSub events and create rows to insert into BigQuery    events.apply("To Table Rows", new 
  PTransform<PCollection<PubsubMessage>, PCollection<TableRow>>() { 
    public PCollection<TableRow> expand(
        PCollection<PubsubMessage> input) {       
 
      return input.apply("To Predictions", ParDo.of(new  
          DoFn<PubsubMessage, TableRow>() {    
     
    @ProcessElement          
    public void processElement(ProcessContext c) throws Exception {
      String message = new String(c.element().getPayload()); 
 
      // parse the json message for attributes
      JsonObject jsonObject = 
          new JsonParser().parse(message).getAsJsonObject();
      String eventType = jsonObject.get("eventType").getAsString();
      String eventVersion = jsonObject.
              get("eventVersion").getAsString();          
      String serverTime = dateFormat.format(new Date()); 
 
     // create and output the table row            
     TableRow record = new TableRow();            
     record.set("eventType", eventType);               
     record.set("eventVersion", eventVersion);          
     record.set("serverTime", serverTime);
     record.set("message", message);            
     c.output(record);          
  }}));      
}})
 
//stream the events to Big Query    
.apply("To BigQuery",BigQueryIO.writeTableRows()   
  .to(table)           
  .withSchema(schema)
  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(WriteDisposition.WRITE_APPEND));

Para resumir el código anterior, cada mensaje que se consume de PubSub se convierte en un objeto de TableRow con una marca de tiempo y luego se transmite a BigQuery para su almacenamiento.

El resultado de esta porción de la tubería de datos es que los eventos serán transmitidos a BigQuery y estarán disponibles para su análisis en la tabla de salida especificada por la tarea de DataFlow.

Para utilizar eficazmente estos eventos para las consultas, necesitará construir ETLs adicionales para crear tablas de eventos procesados con registros esquematizados, pero ahora tiene un mecanismo de recolección de datos para almacenar eventos de seguimiento.



Game event records queried from the raw-events table in BigQuery

Despliegue y escalado automático

Con DataFlow puede probar la tubería de datos localmente o desplegarla en la nube. Si ejecuta los ejemplos de código sin especificar atributos adicionales, entonces la tubería de datos se ejecutará en su máquina local. 

Para poder realizar el despliegue en la nube y aprovechar las capacidades de escalado automático de este conducto de datos, debe especificar una nueva clase de Runner como parte de sus argumentos de tiempo de ejecución. Para ejecutar el conducto de datos, utilicé los siguientes argumentos de tiempo de ejecución:


--runner=org.apache.beam.runners.dataflow.DataflowRunner 
--jobName=game-analytics
--project=your_project_id 
--tempLocation=gs://temp-bucket

Una vez que el trabajo se despliega, deberías ver un mensaje de que el trabajo ha sido enviado. Entonces puede hacer clic en la consola de DataFlow para ver la tarea:


The steaming data pipeline running on Google Cloud

La configuración de tiempo de ejecución especificada anteriormente no se predeterminará en una configuración de escalado automático. 

Para implementar un trabajo que se escalará según la demanda, deberá especificar atributos adicionales, como por ejemplo:

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=30

Detalles adicionales sobre la configuración de una tarea de DataFlow para escalar a condiciones de gran carga de trabajo están disponibles en este artículo de Google y en esta entrada de Spotify.

La imagen de abajo muestra cómo DataFlow puede escalar para satisfacer la demanda según sea necesario.


An example of Dataflow auto scaling. The pipeline will scale up and down as necessary to match demand.

De los datos en bruto a eventos procesados.

El conducto presentado hasta ahora guarda los eventos de rastreo como datos en bruto. Para traducir estos eventos a datos procesados, necesitaremos aplicar esquemas específicos de eventos. Hay algunos enfoques diferentes que podemos tomar con esta tubería:

  1. Aplicar los esquemas en el conducto actual de DataFlow y guardar en BigQuery
  2. Aplicar los esquemas en la tubería actual y enviar a un nuevo PubSub
  3. Aplicar atributos adicionales a los eventos en bruto y enviar a un nuevo PubSub
  4. Usar los ETLs posteriores para aplicar los esquemas

El primer enfoque es el más sencillo, pero no ofrece una buena solución para actualizar las definiciones de los eventos si es necesario. 

Este enfoque puede ser implementado como se muestra en el siguiente código, que muestra cómo filtrar y analizar los eventos MatchStart para su entrada en BigQuery.


events.apply("To MatchStart Events", ParDo.of(
    new DoFn<PubsubMessage, TableRow>() {
@ProcessElement 
public void processElement(ProcessContext c) throws Exception {
  String message = new String(c.element().getPayload());
JsonObject jsonObject = new 
      JsonParser().parse(message).getAsJsonObject();
  String eventType = jsonObject.get("eventType").getAsString();
  String version = jsonObject.get("eventVersion").getAsString();
  String serverTime = dateFormat.format(new Date());

  // Filter for MatchStart events
  if (eventType.equals("MatchStart")) {

    TableRow record = new TableRow();
    record.set("eventType", eventType);
    record.set("eventVersion", version);
    record.set("server_time", serverTime);

    // event specifc attributes
    record.set("userID", jsonObject.get("userID").getAsString());
    record.set("type", jsonObject.get("deviceType").getAsString());
    c.output(record);
  }
}}))
.apply("To BigQuery",BigQueryIO.writeTableRows()

Para implementar este enfoque, necesitaría crear una nueva implementación de DoFn para cada tipo de evento.

El segundo enfoque es similar al primero, pero en lugar de pasar los eventos analizados a BigQuery, se pasan a un nuevo topic de PubSub.

Es posible enviar varios tipos de eventos a un solo tema o crear un tema por evento. El inconveniente de utilizar los dos primeros enfoques es que la lógica de análisis de mensajes forma parte de la cadena de eventos en bruto. Esto significa que cambiar las definiciones de los eventos implica reiniciar la tubería.



The streaming pipeline with an additional output:


Un tercer enfoque que puede utilizarse es el envío de eventos sin procesar con atributos adicionales a otro topic del PubSub. 

Un segundo trabajo de flujo de datos puede ser configurado para analizar los eventos según sea necesario. 

El código que se muestra a continuación muestra cómo analizar los eventos sin procesar, agregar atributos adicionales al mensaje PubSub para filtrarlos y publicar los eventos en un segundo topic. 

Este enfoque permite cambiar las definiciones de los eventos sin necesidad de reiniciar el flujo de eventos sin procesar.


# topic for raw events with additional attributes 
private static String processed = 
  "projects/your_project_id/topics/processed-events";
events.apply("PubSub Processed", 
  ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {             
  @ProcessElement            
  public void processElement(ProcessContext c) throws Exception { 
    String message = new String(c.element().getPayload());   
   
    // parse the JSON message for attributes 
    JsonObject jsonObject = new 
        JsonParser().parse(message).getAsJsonObject(); 
    String eventType = jsonObject.get("eventType").getAsString(); 
    // Add additional attributes for filtering 
    HashMap<String, String> atts = new HashMap();               
    atts.put("EventType", eventType);               
    PubsubMessage out = new PubsubMessage(message.getBytes(), atts);
    c.output(out);                                                 
  }  
}))     
.apply(PubsubIO.writeMessages().to(processed));

Un cuarto enfoque que puede utilizarse es hacer que los procesos de ETL posteriores apliquen esquemas a los sucesos sin procesar y separen la tabla de sucesos sin procesar en tablas de sucesos específicos. Cubriremos este enfoque en el próximo artículo.

Conclusión

Este post ha proporcionado una introducción a la construcción de una tubería de datos para una startup.

Hemos cubierto los tipos de datos en una tubería, las propiedades deseadas de una tubería de datos de alto funcionamiento, la evolución de las tuberías de datos, y una tubería de muestra construida sobre GCP (Google Cloud Platform).

Ahora hay una variedad de herramientas disponibles que hacen posible establecer una tubería de análisis para una aplicación con un esfuerzo mínimo.

El uso de recursos gestionados permite a los equipos pequeños aprovechar la infraestructura sin servidores y de escalado automático para escalar a volúmenes de eventos masivos con una gestión de infraestructura mínima.

En lugar de utilizar la solución estándar de un proveedor de datos para recopilar datos, puede registrar todos los datos relevantes para su aplicación.

Si bien el enfoque que se presenta aquí no es directamente portable a otras nubes, la biblioteca Apache Beam que se utiliza para implementar la funcionalidad principal de este conducto de datos es portable y herramientas similares pueden aprovecharse para construir conductos de datos escalables en otros proveedores de nubes.

El código fuente completo de este pipeline de muestra está disponible en Github.
Join our private community in Discord

Keep up to date by participating in our global community of data scientists and AI enthusiasts. We discuss the latest developments in data science competitions, new techniques for solving complex challenges, AI and machine learning models, and much more!