The Orchestration namespace

Apps Script (advanced level) posted on 8th Jan 2018


Take a moment to read these if this is a new topic for you.
Code is on github

The Orchestration of the stages is the trickiest to get your head around, but most of the work is done by methods available from the Server namespace.
We'll use the usual dashboard to refer back to. This is kind of a long read, and the details of my specific application are unimportant, but it does demonstrate each of the scenarios you are likely to encounter.


Getting started. 

Each Orchestration session begins with an .init() which is kicked off from client side. An Orchestration namespace must include an init method which creates a work package. Here's mine. The Job namespace returns a Work package tailored for the particular Fusion table I want to process.
  // This is called by server to get info on what the work is
  ns.init = function () {
  
    // get the work
    const work = Job.getJob ("performance");
   
    // keep to this pattern for initalization
    return Server.initializeWork (work);
  };

It also must return a call to Server.initializeWork and pass the work package you need it to deal with. For the contents of this work package see Creating a work package

When this is complete the dashboard will update this stage.
Note that your app will look nothing like mine from now on, but all the principles will be covered in each of the following stage descriptions.

Counting stage

The next stage in my App is to go to the Fusion table and see how many rows it contains. Each Orchestrate function receives a standard argument, which looks like this
{
  stages 
  stageIndex
  chunkIndex
}

where
  • stages - all the info from the work package, plus calculated thread and chunk information
  • stageIndex - the index in the work package 
  • chunkIndex - the chunk or thread index. If there is no parallelism (maxThreads = 1) , then this will always be 0.
Any arguments for this method will be in stages[stageIndex].instances.arg
  /**
  * @param {object} pack (stages, stageIndex, chunkIndex)
  * @return {object} updated chunk
  */
  ns.getCount = function (pack) {
   
    const stage = pack.stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    const arg = stage.instances.arg;
    
    // do the work - any arguments will be as below
    const count = FusionToDB.getCount (arg)[0].count;
    
    // register it for future reference
    // normally the count of items will be the length of result, but it can
    // be overridden by a number or a function in the 4th argument
    return Server.registerChunk (stage, chunk , [count] , arg.maxr ? Math.min(count,arg.maxr) : count );

  };

Every Orchestration method must end with a call to Server.registerChunk with these arguments
  • stage - the stage being processed (stages[stageIndex])
  • chunk - the chunk being processed (stages[stageIndex].chunks[chunkIndex])
  • result - an array of results from this chunk
  • optionally a resultCount value - in most cases this can be omitted and the length of the result array will be used. However in this case, there was only one result - a count of the number of rows in a table. I want to signal the next stage that it will have that count number of items to process. This will be used to calculate the optimum number of parallel threads to use to retrieve the data.

Get data stage

Next stage is to get the data from the database. The count came from the previous stage, and GasThreader has calculated the 24 would be a good number of threads to run in parallel. This means that the count from the previous stage would have been used to distribute the work amongst each of the 24 threads. I have all my app specific code in the FusionToDb namespace so its not important to look at that. What is important is that it's being told to retrieve a certain number of rows, starting at a particular position. 
  ns.getData = function (pack) {
    
    // get the reduced stage - all the data
    const stage = pack.stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    
    // Important
    // if you need to modify arg - clone it first
    const arg = Utils.clone(stage.instances.arg);
    
    // in this case there's no data to get,
    // as the previous stage was just a count
    
    // arg contains the info needed to open the db
    // add the slice I'm doing
    arg.maxr = chunk.chunkSize;
    arg.start = chunk.start;
    
    // there's other chunking that happens in Fusion too
    // as there's a limit to how much can be read in one go
    // that's dealt with invisibly here
    const result = FusionToDB.getChunked (arg);

    // register it for future reference
    return Server.registerChunk (stage, chunk, result);
  
  };


Each of the 24 threads will receive a different value for 
chunk.chunkSize
chunk.start

so they can all do their work simultaneously. When complete the dashboard looks like this.
Do not modify stage.instances.arg - as it is part of the stage object which will be carried forward to the next stages. If you do need to fiddle with arg - as I do in this example for demonstration - clone it first. You can use Utils.clone().

As usual a call to registerChunk will record the results of this thread. You can see from the work package below that the reduce stage will be skipped for this stage. This is because 1m rows will use too much memory to deal with all at once, so the next stage will deal with the result data in chunks.
 {
          stage: "getData",
          stageTitle: "Getting data",
          skipReduce: true,

          // the maximum number of chunks to break the job into omit for the default
          maxThreads: 24,

          // the min size of a chunk to avoid splitting job into inefficiently small chunks
          // omit for default
          minChunkSize: 200,

          // the
          instances: {
            namespace: "Orchestration",
            method: "getData",
            arg: fusion
          }
        },

Define fields stage

This stage is going to look at all the data to find out the maximum length of any data in them and deduce the data type to best use when creating the SQL table definition. This is fine to do many threads in parallel too, as I can combine the findings of each thread in a future merge stage. 

 /**
  * we have to merge several definitions
  * @param {object} pack (stages, stageIndex, chunkIndex)
  * @return {object} updated chunk
  */
  ns.defineFields = function (pack) {
   
    // get the reduced stage - all the data
    const stage = pack.stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    // we're going to enhance this arg, so take a copy
    const arg = Utils.clone(stage.instances.arg);
    
    // the results from the previous stage (or I could seeach for its name)
    const prev = pack.stages[pack.stageIndex -1];

    // normally prev.result would contain the reduced data from the previous stage 
    // but this takes care of the chunks being different sizes
    arg.data = Server.getSlice (prev ,chunk.start , chunk.chunkSize + chunk.start);
    
    // look at all the data and figure out the field types
    const result = FusionToDB.defineFields (arg);
    
    
    // register for this chunk
    return Server.registerChunk (stage, chunk, result, 1);
  
  };

of interest here is 
  • how to get the previous stage
 const prev = pack.stages[pack.stageIndex -1];
  • how to get data from a previous stage - when the reduce operation has been skipped. It may be that the number of threads in this stage is different than the stage from which we're getting the data, but Server.getSlice takes care of that by scanning across several previous chunks if required.
arg.data = Server.getSlice (prev ,chunk.start , chunk.chunkSize + chunk.start);

As usual it ends with a call to Server.registerChunk. Here's the dashboard now.

Merge fields stage

Now I have 24 different interpretations of the field definitions to consolidate down to 1, the details of which are below but unimportant.
/**
  * we have to merge several definitions
  * @param {object} pack (stages, stageIndex, chunkIndex)
  * @return {object} updated chunk
  */
  ns.mergeFields = function (pack) {
  
    // get the reduced stage - all the data
    const stage = pack.stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    
    // the previous stage (or I could seeach for its name)
    const prev = pack.stages[pack.stageIndex -1];
    
    // get all the data from the previous stage
    const reduced = Server.getReduced (prev);
    
    
    // merge the fields
    const fields = reduced.result.reduce (function (p,c) {
      Object.keys(c)
      .forEach (function(k) {
        if (!p[k]) {
          p[k] = c[k];
        }
        else {
          /// merge the field definition
          if (c[k].type !== p[k].type) {
            // discovered mixed types
            if (p[k].type === "INT" && c[k].type === "FLOAT") {
              p[k].type = "FLOAT";
            }
            else {
              p[k].type === "STRING";
            }
            p[k].size = Math.max (p[k].size , c[k].size);
            p[k].nulls += c[k].nulls;
          }
        }
      });
      return p;
    },{});
    
    // write that chunk
    return Server.registerChunk (stage, chunk, fields);
  };

of interest here is 
  • how to get the reduced data (all the chunks combined) from the previous stage
 const prev = pack.stages[pack.stageIndex -1];
 const reduced = Server.getReduced (prev);

As usual it ends with a call to Server.registerChunk. Here's the dashboard now.

Define table stage

I have enough now to create the first final deliverable - an SQL definition of the table based on the fields definitions derived from scanning the data contents.
  /**
  * define table
  * @param {object} pack (stages, stageIndex, chunkIndex)
  * @return {object} updated chunk
  */
  ns.defineTable = function (pack) {
    
    // get the reduced stage - all the data
    const stages = pack.stages;
    const stage = stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    const arg = Utils.clone(stage.instances.arg);
    
    // the previous stage (or I could seeach for its name)
    const prev = pack.stages[pack.stageIndex -1];
    
    // get all the data from the previous stage
    arg.typeDefs = Server.getReduced (prev).result[0];

    // do the work for this stage
    const result = FusionToDB.defineTable (arg);
    
    // find the stage that made the data - as that's the input 
    const dataStage = Server.findStage ("getData" , stages);
    
    // write that chunk - should only be 1 - and set up the data length as input to next stage
    return Server.registerChunk (stage, chunk, result, dataStage.resultLength);
  
  };

of interest here is 
  • how to find a previous stage by name
 const dataStage = Server.findStage ("getData" , stages);

As usual it ends with a call to Server.registerChunk, but notice that the resultCount is based on the results from a stage a few back - the getData stage. This can be signalled by finding the getData stage as above, and passing its resultLength to ServerChunk as the last argument.


SqlInserts stage

Next I need to go back to the actual data and create SQL insert scripts to insert this data into an SQL database in optimized chunks. Note that the data input for this stage is not from the previous stage, but from one a few steps ago.
 /**
  * make sql import statement
  * @param {object} pack (stages, stageIndex, chunkIndex)
  * @return {object} updated chunk
  */
  ns.makeSqlStatements = function (pack) {

    // get the reduced stage - all the data
    const stages = pack.stages;
    const stage = stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    
    // we're going to enhance this arg, so take a copy
    const arg = Utils.clone(stage.instances.arg);
    
    // find the stage that made the data - as that's the input 
    const dataStage = Server.findStage (stage.dataCount , stages);
    
    // get the chunk of data that matches what needs to be processed in this this chunk
    arg.data = Server.getSlice (dataStage ,chunk.start , chunk.chunkSize + chunk.start);

    // find the type def
    const mergeStage = Server.findStage ("mergeFields" , stages);
    const mergeReduced = Server.getReduced (mergeStage);

    // add typedefs & data to the args
    arg.typeDefs = mergeReduced.result[0];

    // its possible to skip within the chunk
    
    const files = [];
    arg.skip = arg.skip || 0;

    // loop around at optimized sql sizes
    const stms = [];
    do {
      
      var valChunk = FusionToDB.makeValues (arg);
     
      if (valChunk.length) {
        
        // make SQL insertsmake
        arg.values = valChunk;

        // make SQL inserts
        arg.inserts = FusionToDB.makeInserts (arg);
    
        // write to a sequence of inserts
        stms.push(FusionToDB.makeInsertSql (arg));
        
        arg.skip += valChunk.length;
        
      }
    
    } 
    while (valChunk.length);
    
    // write that chunk - and set up the data length as input to next stage
    return Server.registerChunk (stage, chunk, stms);
  
  };

of interest here is 
  • referencing the datacount property to find the input stage. The work package for this stage has dataCount set to getData.
  const dataStage = Server.findStage (stage.dataCount , stages);
  • using the results of more than one previous stage. This stage needs both the data stage (which was not reduced) and the mergeFields stage (which was).
    // find the type def
    const mergeStage = Server.findStage ("mergeFields" , stages);
    const mergeReduced = Server.getReduced (mergeStage);

This ends with Server.registerChunk and the 1m records reduced to 96 SQL insert statements.

Make SQL scripts


Next the SQL insert statements need to be combined into sql scripts optimized for the target database, and write these to Drive.
ns.makeSqlScripts = function (pack) {

    // get the reduced stage - all the data
    const stages = pack.stages;
    const stage = stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    
    // we're going to enhance this arg, so take a copy
    const arg = Utils.clone(stage.instances.arg);
    
    // get the prev stage
    const prev = stages[stage.stageIndex -1];
    arg.sql =  Server.getSlice (prev ,chunk.start , chunk.chunkSize + chunk.start);
    
    // filename starts at chunk start + some offset if given
    arg.fileNameStart = (arg.fileNameStart || 0 ) + chunk.start ;
    arg.fileNameStem = (arg.fileNameStem || ("sqlins-" + arg.db + "-" + arg.table + "-") ) + zeroPad(chunk.chunkIndex,3) + "-";
    arg.fileExtension = arg.fileExtension || ".sql";

    arg.folderId = arg.folderId || DriveApp.getRootFolder().getId();
    var folder = DriveApp.getFolderById(arg.folderId);
    if (!folder) throw 'folder for id ' + arg.folderId + ' not found';

    const file = folder.createFile(folder.createFile(arg.fileNameStem+zeroPad(arg.fileNameStart,6)+arg.fileExtension, arg.sql.join("\n"), "text/plain"));


    // write that chunk
    return Server.registerChunk (stage, chunk, file.getId());
  };

There's nothing new here, and the stage ends with a Server.registerChunk, and just 7 script files created.

Zip script files

Finally these scripts need to be combined into a single zip file, ready to be downloaded to their target server.
ns.makeSqlScripts = function (pack) {

    // get the reduced stage - all the data
    const stages = pack.stages;
    const stage = stages[pack.stageIndex];
    const chunk = stage.chunks[pack.chunkIndex];
    
    // we're going to enhance this arg, so take a copy
    const arg = Utils.clone(stage.instances.arg);
    
    // get the prev stage
    const prev = stages[stage.stageIndex -1];
    arg.sql =  Server.getSlice (prev ,chunk.start , chunk.chunkSize + chunk.start);
    
    // filename starts at chunk start + some offset if given
    arg.fileNameStart = (arg.fileNameStart || 0 ) + chunk.start ;
    arg.fileNameStem = (arg.fileNameStem || ("sqlins-" + arg.db + "-" + arg.table + "-") ) + zeroPad(chunk.chunkIndex,3) + "-";
    arg.fileExtension = arg.fileExtension || ".sql";

    arg.folderId = arg.folderId || DriveApp.getRootFolder().getId();
    var folder = DriveApp.getFolderById(arg.folderId);
    if (!folder) throw 'folder for id ' + arg.folderId + ' not found';

    const file = folder.createFile(folder.createFile(arg.fileNameStem+zeroPad(arg.fileNameStart,6)+arg.fileExtension, arg.sql.join("\n"), "text/plain"));


    // write that chunk
    return Server.registerChunk (stage, chunk, file.getId());
  };


Wrap up

You may want to report some kind of result. By default, the final reduced result is output but this may not be appropriate. main.js.html can be modified if necessary to report the result.  


This is the only tweaking you need to do to the Client side code (in red below), and I may improve this in future versions.
window.onload = function() {

  // set up client app structure
  App.initialize();
  Home.init();

  
  // get the ui going
  Client.init()
    .then (function (result) {
      // get the final result
      return Provoke.run("Server","getReduced",result.stage)
        .then (function (reduced) {
          Render.showResult (JSON.stringify(reduced.result));
          App.toast (result.work.title,"completed");
        })
    })
    ['catch'](function(err) {
      App.showNotification ("work failed",  err);
    });

};

Previous





 

 Why not join our forum, follow the blog or follow me on twitter to ensure you get updates when they are available.

Comments