#!/usr/bin/perl
#
my $USAGE = "#
# Usage: $0 [ -v verbosity ] [ -s ] url_or_file
# Where:
#   -v num       Set verbosity (default level = 10)
#   -s           Disable SSL cert checks
#   url_or_file  Either a filename or URL to the Beat JSON/YAML config that is sent to Elastic for ingest pipelines
#
# The converted configuration is sent to STDOUT
#
# Example invocations:
#  $0 pipeline.yml                           # Assumes that 'pipeline.yml' exists in the current directory
#  $0 module/iis/access/ingest/pipeline.yml  # Assumes the file exists in the specified directory
#  $0 https://raw.githubusercontent.com/elastic/beats/master/filebeat/module/iis/error/ingest/pipeline.yml
#  $0 https://raw.githubusercontent.com/elastic/beats/master/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml
#
" . '# Version: $Id: ingest-beat2logstash,v 2.12 2021/05/06 16:39:45 dan Exp $' . "
#
";
#

use strict;

##  DON'T use the standard YAML module: some Beat YML files aren't compatible in the old YAML format, and YAML.pm chokes.
#use YAML;
use Carp;
use JSON;
use YAML::XS;
use Getopt::Std;
use Data::Dumper;
use LWP::UserAgent;

my %opt;
getopts('sv:', \%opt) || die $USAGE;
my $verbosity  = $opt{'v'} // 10;
my $nosslcheck = $opt{'s'};
my $spacesperindent = 3;
die "Error: Invalid number of parameters (@ARGV)!  Died" unless ($#ARGV == 0);


my %processor = (

  "append"     => \&process_set,
  "convert"    => \&process_convert,
  "date"       => \&process_date,
  "geoip"      => \&process_geoip,
  "grok"       => \&process_grok,
  "gsub"       => \&process_gsub,
  "kv"         => \&process_kv,
  "remove"     => \&process_remove,
  "rename"     => \&process_rename,
  "set"        => \&process_set,
  "split"      => \&process_split,
  "urldecode"  => \&process_urldecode,
  "user_agent" => \&process_useragent,

);


my $url = shift @ARGV;
my $input;
if ($url =~ m|^https?://|)
{
  my $ua = LWP::UserAgent->new();
  if ($nosslcheck)
  {
    $ua->ssl_opts( 'verify_hostname' => 0 );
    ## Setting PERL_LWP_SSL_VERIFY_HOSTNAME to 0 before starting this script is equivalent
  }
  else
  {
    if (!defined $ENV{'PERL_LWP_SSL_CA_FILE'})
    {
      if (-f '/etc/ssl/certs/ca-certificates.crt')
      { $ua->ssl_opts( 'SSL_ca_file' => '/etc/ssl/certs/ca-certificates.crt' ) }
      elsif ($ENV{'PERL_LWP_SSL_VERIFY_HOSTNAME'} ne '0')
      { printf STDERR "WARNING: /etc/ssl/certs/ca-certificates.crt not found and env PERL_LWP_SSL_CA_FILE not set.  SSL checks may fail!\n"; }
    }
  }

  printf STDERR "DEBUG: Loading URL %s\n", $url
    if ($verbosity >= 20);
  my $resp = $ua->get($url) || die "Failed to pull ${url}: $!";
  printf STDERR "DEBUG: URL get result: %s\n", $resp->content
    if ($verbosity >= 60);
  die "Error: failed to pull ${url}\n" . $resp->content
    if ($resp->content =~ /^Can't verify SSL peers/);
  $input = $resp->content;
}
else
{
  printf STDERR "DEBUG: Reading in file %s\n", $url
    if ($verbosity >= 20);
  my $osep = $/;
  $/ = undef;
  open(INPUT, $url) || die "Error: unable to open/read ${url}: $!";
  $input = <INPUT>;
  close(INPUT);
  $/ = $osep;
}

my $struct;
if ($input =~ /^\s*{/)
{
  ## JSON data
  $struct = decode_json($input) || die "Error: Failed to load JSON data from ${url}! $!";
}
else
{
  ## YAML data
  $struct = Load($input) || die "Error: Failed to load YAML data from ${url}! $!";
}
printf STDERR "DEBUG: %s\n", Dumper($struct)
  if ($verbosity >= 90);



open(OUT, '>&STDOUT');
printf OUT "#\n# Description: %s\n#\n\n", $struct->{'description'};
print  OUT "filter {\n";
foreach my $chunk (@{$struct->{'processors'}})
{
  printf STDERR "DEBUG: Processing section %s\n", Dumper($chunk)
    if ($verbosity >= 50);
  my $action = getKey($chunk);
  if (defined $processor{$action})
  {
    printf STDERR "DEBUG: Invoking processor %s\n", $action
      if ($verbosity >= 30);
    my $str = $processor{$action}($chunk->{$action});

    foreach my $tag (keys %{$chunk->{$action}})
    {
      next if ($tag eq '_cmuHandle');
      next if (defined $chunk->{$action}{'_cmuHandle'}{$tag});
      printf OUT "%s## Parameter %s NOT handled in the following module!\n", (' 'x $spacesperindent), $tag;
    }
    printf OUT "%s\n", doIndent($str, 1);
  }
  else
  {
     printf OUT "### Skipping processing option: %s\n", $action;
     foreach my $k (sort keys %{$chunk->{$action}})
     {
       my $ptr = $chunk->{$action}{$k};
       my $str = ref($ptr) ? encode_json($ptr) : $ptr;
       $str =~ s/\n/\n#######/g;
       printf OUT "#### %s: %s\n", $k, $str;
     }
   }

}
print  OUT "}\n";
close(OUT);



#---------------------------------------------------------------------

sub getKey($)
{
  my $ptr = shift;
  my @list = keys %{$ptr};

  return $list[0] if ($#list == 0);
  my $info = Dumper($ptr);
  die "Error: expected a single key!  Got ($info).  Died";
}

#---------------------------------------------------------------------
# Convert an Elastic field name to a Logstash field name (takes a string OR an array)
# event.ingested => [event][ingested]

sub convert2squares($)
{
  my $field = shift;

  if (ref($field) eq 'ARRAY')
  { return (map { convert2squares($_) } @{$field}); }

  croak "Error: null/empty field passed to convert2square!  Died" if ($field eq '');
  return $field if ($field eq 'null');
  #return $field unless ($field =~ '\.'); ## Always put squares around the field name.
  return sprintf('[%s]', join('][', split(/\./, $field)));
}

#---------------------------------------------------------------------
# Convert an Elastic value to its Logstash equivalent (takes a string)

sub convert2value($)
{
  my $val = shift;
  croak "Error: undefined field passed to convert2value!  Died" unless (defined $val);

  my $buf = '';
  my $tmp = $val;
  do
  {
    if ($tmp =~ /^([^{]*){{\s*([^}]*[^ }])\s*}}(.*)$/)
    {
      $buf .= $1;
      my $field = $2;
      $tmp = $3;

      $buf .= sprintf('%%{%s}', convert2squares($field));
    }
    else
    {
      $buf .= $tmp;
      $tmp  = '';
    }
  }
  while ($tmp ne '');

  return $buf;
}

#---------------------------------------------------------------------
# Convert Elastic regexes to Logstash regexes (the field names in particular)

sub convert2logstashgrok($)
{
  my $ptr = shift;
  my @results;
  foreach my $pat (@{$ptr})
  {
    my $tpat = $pat;
    chomp($tpat);
    my $buf = '';
    do
    {
      if ($tpat =~ /^([^%]+)(.*)$/)
      {
        $buf .= $1;
        $tpat = $2;
      }
      elsif ($tpat =~ /^\%{([A-Z0-9_]+):([A-Z0-9_.]+)(:([A-Z0-9_.]+))?}(.*)/i)
      {
        my $PAT = $1;
        my $var = $2;
        my $cast = $4;

        ## 'long' is an invalid type for Grok/logstash
        $cast = 'int' if ($cast eq 'long');
        $cast = 'int' if ($cast eq 'integer');
        $tpat = $5;
        $buf .= sprintf('%{%s:%s%s}',
           $PAT,
           convert2squares($var),
           ((defined $cast) ? ":${cast}" : ''));
      }
      elsif ($tpat =~ /^\%{([A-Z0-9_]+)}(.*)/i)
      {
        my $PAT = $1;

        $tpat = $2;
        $buf .= sprintf('%{%s}', $PAT);
      }
      elsif ($tpat =~ /^[^%]*$/)
      {
        $buf .= $tpat;
        $tpat = '';
      }
      else
      { die "Error: Tried to parse pattern '${pat}' but failed at '${tpat}'!  Died"; }
    }
    while ($tpat ne '');

    # Escape any double quotes seen in the string
    $buf =~ s/"/\\"/g;
    push(@results, sprintf('"%s"', $buf));
  }

  return @results;
}

#---------------------------------------------------------------------
#  Escape double-quotes (and maybe backslashes at some point?) if seen

sub escape($)
{
  my $str = shift;
  $str =~ s/"/\\"/g;
  return $str;
}

#---------------------------------------------------------------------
#  Put square brackets around top-level variables -- this is for filter expressions

sub filterQuote($)
{
  my $str = shift;
  return $str if ($str =~ /^\[.*\]$/);
  return "[$str]";
}

#---------------------------------------------------------------------
#  Properly fix indentation after newlines, accounting for nesting levels

sub doIndent($;$)
{
  my $str = shift;
  my $startlevel = shift // 0;

  $str =~ s/^[\s]+//;
  $str =~ s/[\s]+$//;

  my @expect;
  my @newstr = ();
  my $curlevel = 0;
  my $newline = 1;

  ##
  ## States:
  ##  0  = normal
  ##  1  = '\' was the last char
  ##  2  = eating through whitespace after a newline
  ##  10 = In a double-quoted string
  ##  11 = In a double-quoted string after a '\'
  ##  20 = In a single-quoted string
  ##  21 = In a single-quoted string after a '\'
  ##  30 = after a comment (# comment)
  my $state = 0;
  my $pos = 0;
  my $wasnewline = 0;
  foreach my $i (split(//, $str))
  {
    $pos++;
    $wasnewline = $newline;
    printf STDERR "Loop[%03d]: state=%2s level=%2d char %s stack=%s\n", $pos, $state, $curlevel, $i, join('', @expect)
      if ($verbosity >= 120);
    if ($newline == 1)
    {
      push(@newstr, ' ' x (($curlevel + $startlevel)*$spacesperindent)) if ($state == 0);
      $newline = 0;
    }

    REDO: {
      if ($state == 0)
      {
        if ($i eq '{')
        {
          $curlevel++;
          push(@expect, '}');
        }
        elsif ($i eq '[')
        {
          $curlevel++;
          push(@expect, ']');
        }
        elsif ($i eq '(')
        {
          $curlevel++;
          push(@expect, ')');
        }
        ## { [ (  ## Here for VI using '%' for matching
        elsif (($i eq '}') || ($i eq ']') || ($i eq ')'))
        {
          die "Error: nesting is inside out in string '${str}' at char ${pos}!  Died" if ($#expect < 0);
  
          my $e = pop(@expect);
          die "Error: expected a ${e} bracket but saw a ${i} in ($str) at char ${pos}!  Died" if ($e ne $i);
  
          $curlevel--;
          ## Remove one indent level
          if ($wasnewline)
          {
            pop(@newstr);  ## Get rid of the indent and re-add it
            push(@newstr, ' ' x (($curlevel + $startlevel)*$spacesperindent));
          }
  
        }
        elsif ($i eq ' ')
        {
          $state = 2 if ($wasnewline);
        }
        elsif ($i eq '#')
        {
          $state = 30;
        }
        elsif ($i eq '\\')
        {
          $state = 1;
        }
        elsif ($i eq '"')
        {
          $state = 10;
        }
        elsif ($i eq "'")
        {
          $state = 20;
        }
        elsif ($i eq "\n")
        { $newline = 1; }
        ## Else do nothing -- regular token
      }
      elsif ($state == 1)
      { $state = 0; }
      elsif ($state == 2)
      {
        if ($i ne ' ')
        { $state = 0; goto REDO; }
      }
      elsif ($state == 10)
      {
        if ($i eq '\\')
        { $state = 11; }
        elsif ($i eq '"')
        { $state = 0; }
      }
      elsif ($state == 11)
      { $state = 10; }
  
      elsif ($state == 20)
      {
        if ($i eq '\\')
        { $state = 21; }
        elsif ($i eq "'")
        { $state = 0; }
      }
      elsif ($state == 21)
      { $state = 20; }
      elsif ($state == 30)
      {
        $state = 0 if ($i eq "\n");
      }
      else
      { die "Unhandled state ${state}!  Died"; }
  
      if ($i eq "\n")
      { $newline = 1; }
    }

    push(@newstr, $i) unless ($state == 2);
  }
  push(@newstr, "\n");

  die "Error: missing ($curlevel) closing brackets in: ${str} (NOW HAVE:\n".join('', @newstr)."\n)\n" if ($curlevel != 0);
  return join('', @newstr);
}

#---------------------------------------------------------------------

sub tokenize($)
{
  my $str = shift;
  my $tmp = $str;

  my @parts;
  while ($tmp ne '')
  {
    if ($tmp =~ /^\s*([a-z0-9\?._]+)\s*([\S\s]*)/i)
    {
      $tmp = $2;
      my $var = $1;
      $var =~ s/\?(\.|$)/./g;
      $var =~ s/^ctx\.//;
      if (($tmp =~ /^\(/) && ($var =~ /^(.+)\.([a-z0-9_]+)$/))
      { push(@parts, $1, "[][$2]"); }
      else
      { push(@parts, $var); }
    }
    elsif ($tmp =~ /^\s*([]})[{(])\s*([\S\s]*)$/)
    {
      $tmp = $2;
      push(@parts, $1);
    }
    elsif ($tmp =~ /^\s*([-+*\/<>!=&|]+)\s*([\S\s]*)$/)
    {
      $tmp = $2;
      push(@parts, $1);
    }
    elsif ($tmp =~ /^('[^']*(?<!\\)')\s*([\S\s]*)$/)
    {  ## (?<!\\)  meaning: the previous character is NOT a backslash! (non-consuming)
      $tmp = $2;
      push(@parts, $1);
    }
    elsif ($tmp =~ /^("[^"]*(?<!\\)")\s*([\S\s]*)$/)
    {  ## (?<!\\)  meaning: the previous character is NOT a backslash! (non-consuming)
      $tmp = $2;
      push(@parts, $1);
    }
    else
    { die "Failed to parse string '${str}' at: '${tmp}'!  Died"; }
  }

  return @parts;
}

#---------------------------------------------------------------------
# Given the "source" array, and an offset, return the next NESTED "expression"
#   Offset 0: matched (boolean)
#   Offset 1: how many "chunks" in the source array (to skip over on matches)
#   Offset 2+: chunks of the expression

sub pullExpression($$)
{
  my $array = shift;
  my $start = shift;

  my @parts = ();
  my $depth = 0;
  return (1, 1, $array->[$start]) if ($array->[$start][1] =~ /^[][a-z0-9._]{2,}$/i);
  return (0,0) if ($array->[$start][1] !~ /^[({[]$/);
  for (my $i = $start; $i <= $#{$array}; $i++)
  {
    if ($array->[$i][1] =~ /^[({[]$/)
    { $depth++; }
    elsif ($array->[$i][1] =~ /^[])}]$/)
    {
      $depth--;
      return (0,0) if ($depth < 0);
    }
    push(@parts, $array->[$i]);
    return (1, ($#parts+1), @parts) if ($depth == 0);
  }

  # Incomplete expression
  return (0, 0);
}

#---------------------------------------------------------------------
# Given the "source" array, the position to try matching at, and what to match against
# Return: array
#   Offset 0: matched (boolean)
#   Offset 1: how many "chunks" in the source array (to skip over on matches)
#   Offset 2+: match parameters (like $1, $2, $3, ... in regular expressions, one per non-literal)

sub doesMatch($$$)
{
  my $source = shift;
  my $offset = shift;
  my $pattern = shift;

  my @parts;
  my $pos = $offset;
  my $depth = 0;
  return (0, 0) if ($#{$source}+1 - $offset < ($#{$pattern}+1)/2);

  printf STDERR "DEBUG: doesMatch: have %d source and %d match @ offset %d\n", $#{$source}+1, ($#{$pattern}+1)/2, $offset
    if ($verbosity >= 100);
  my $i = 0;
  for (; ($i <= $#{$pattern}) && ($pos <= $#{$source}); $i += 2, $pos++)
  {
    my $op = $pattern->[$i];
    my $val = $pattern->[$i+1];
    printf STDERR "DEBUG: doesMatch[%d/%d]: Matching %s/%s against %s\n", $i, $pos, $op, $val, $source->[$pos][1]
      if ($verbosity >= 100);
    if ($op eq 'literal')
    {
      return (0, 0) if ($source->[$pos][1] ne $val);
    }
    elsif ($pattern->[$i] eq 're')
    {
      my $val = $pattern->[$i+1];
      return (0, 0) if ($source->[$pos][1] !~ /(${val})/);
      push(@parts, [ 're', $1 ]);
    }
    elsif ($pattern->[$i] eq 'expression')
    {
      my @pieces = pullExpression($source, $pos);
      return (0,0) unless ($pieces[0]);
      printf STDERR "DEBUG: doesMatch: Matched expression at %d (out of %d): %s\n", $i, $#{$pattern}, Dumper(\@pieces)
        if ($verbosity >= 100);
      push(@parts, [ 'expression', [ @pieces[2..$#pieces] ] ]);
      $pos += $pieces[1] - 1;
    }
    else
    { croak "Unhandled match type '${op}'!  Aborting"; }
  }
  return (0, 0) unless ($pos <= ($#{$source} + 1));
  return (0, 0) unless ($i == $#{$pattern} + 1);
  my @result = (1, $pos - $offset, @parts);

  printf STDERR "DEBUG: Expression match was found! (%d chunks long @ offset %d)\nSource: %s\npattern %s\nMatch: %s\n",
    ($pos - $offset), $offset,
    Dumper($source), Dumper($pattern), Dumper(\@result)
    if ($verbosity >= 100);

  ## If we get to here, it matched!
  return (1, $pos - $offset, @parts);
}

#---------------------------------------------------------------------
# Validate the rebuilt expression structure, to make sure the data structures are as expected

sub sanity_check ($)
{
  my $struct = shift;
  if (ref($struct) ne 'ARRAY')
  {
    printf STDERR "ERROR: sanity_check: The pointer did not reference an array! (type: %s)\n",
      ref($struct)
      if ($verbosity >= 10);
    return 1;
  }

  foreach my $chunk (@{$struct})
  {
    if (ref($chunk) ne 'ARRAY')
    {
      printf STDERR "ERROR: sanity_check: The chunklet was not an array! (was: %s)\n",
        ref($chunk)
        if ($verbosity >= 10);
      return 1;
    }
    if ($#{$chunk} < 1)
    {
      printf STDERR "ERROR: sanity_check: chunklet has less than two members! (was: %s) [%s]\n",
        ($#{$chunk}+1), Dumper($chunk)
        if ($verbosity >= 10);
      return 1;
    }
    if (ref($chunk->[0]) ne '')
    {
      printf STDERR "ERROR: sanity_check: First value of chunklet was not a scalar! (was: %s)\n",
        ref($chunk->[0])
        if ($verbosity >= 10);
      return 1;
    }
    if (ref($chunk->[1]) ne '')
    {
      printf STDERR "ERROR: sanity_check: second value of chunklet was not a scalar! (was: %s)\n",
        ref($chunk->[1])
        if ($verbosity >= 10);
      return 1;
    }
  }
  return 0;
}

#---------------------------------------------------------------------
# Given a source "expression", a pattern, and a replacement array, replace any and all expressions in
# the source that match the pattern with the replacement array

sub doReplace($$$)
{
  my $array   = shift;  ## Contents to modify
  my $pattern = shift;  ## Pattern to match
  my $replace = shift;  ## Replacement expression

  die "Error: parm 1 is not an array!  Died" unless (ref($array) eq 'ARRAY');
  die "Error: parm 2 is not an array!  Died" unless (ref($pattern) eq 'ARRAY');
  die "Error: parm 3 is not an array!  Died" unless (ref($replace) eq 'ARRAY');

  my $matched = 0;
  my @before = @{$array};
  for (my $pos = 0; $pos <= $#{$array}; $pos++)
  {
    my @match = doesMatch($array, $pos, $pattern);
    if ($match[0])  ## Match?
    {
      $matched++;
      printf STDERR "DEBUG: doReplace: Match found!\n",
        if ($verbosity >= 40);
      printf STDERR "DEBUG: doReplace match at pos %d!\nMatch = %s\nReplace = %s \n", $pos, Dumper(\@match), Dumper($replace)
        if ($verbosity >= 100);
      my @newarray = ();
      for (my $i = 0; $i < $pos; $i++)
      { push(@newarray, $array->[$i]); }
      for (my $repl = 0; $repl <= $#{$replace}; $repl += 2)
      {
        my $op = $replace->[$repl];
        my $val = $replace->[$repl+1];
        printf STDERR "DEBUG: Performing '%s' replacement chunk\n", $op
          if ($verbosity >= 50);
        if ($op eq 'literal')
        {
          push(@newarray, [ $op, $val ] );
        }
        elsif ($op eq 'expression')  ## Val NEEDS to match $\d+
        {
          if ($val =~ /^\$(\d+)$/)
          {
            my $expnum = ($1) - 1;
            die "Error: position ${expnum} is NOT an expression!  Died" unless ($match[$expnum+2][0] eq 'expression');
            my $expr = $match[$expnum+2][1];
            push(@newarray, @{$expr});
          }
          else
          { die "Error: Expression value '$val' does NOT resemble \$number!  Died"; }
        }
        elsif ($op eq 'function')
        {
          my @add = $val->(@match[2..$#match]);
          push(@newarray, @add);
        }
        else
        { die "Error: Unhandled $replace parameter: '${op}'!  Died"; }
      }
      my $newpos = $#newarray;
      printf STDERR "DEBUG: doReplace: Pos %d: replaced %d chunks with %d chunks\n",
        $pos,
        $match[1],
        ($newpos - $pos + 1) 
        if ($verbosity >= 45);
      for (my $i = $pos + $match[1]; $i <= $#{$array}; $i++)
      {
         push(@newarray, $array->[$i]);
      }

      if (sanity_check(\@newarray) != 0)
      {
        printf STDERR "ERROR: Failed to properly replace something!\nBEFORE: %s\nAFTER: %s\nPattern: %s\nMatch: %s\nReplace: %s\n",
          Dumper($array),
          Dumper(\@newarray),
          Dumper($pattern),
          Dumper(\@match),
          Dumper($replace);
        die "Corrupted replacement";
      }

      ## Skip over what was matched, move back by the number of elements "eaten", and back one because we auto-increment in a moment
      @{$array} = @newarray;
      $pos = $newpos;
    }
  }

  if ($matched && ($verbosity >= 25))
  {
    printf STDERR "DEBUG: BEFORE: %s\n", Dumper(\@before);
    printf STDERR "DEBUG: Match: %s\n", Dumper($pattern);
    printf STDERR "DEBUG: RESULT: %s\n", Dumper($array);
  }
}

#---------------------------------------------------------------------

###
## the following are doReplace function rewriter subroutines
###

sub _rw_repeatmatch()
{
  my $op = shift;
  my $num = shift;

  my $offset = 1;
  if ($op =~ /^(.*)=$/)
  {
    $op = $1;
    $offset = 0;
  }
  return [ 'rpat', sprintf('/^.{0..%d}$/', ($num-$offset)) ] if ($op eq '<');
  return [ 'rpat', sprintf('/^.{%d,}/', ($num+$offset)) ] if ($op eq '>');
  return [ 'rpat', sprintf('/^.{%d}/', $num) ] if ($op eq '=');
  die "Error: unknown operator '${op}'!  Died";
}

#-----

sub _rw_nullcomp()
{
  my $idx = shift;

  my $expr = $_[$idx] // croak "Error: undefined index ($idx) specified!  Died";
  my $op   = $_[$idx+1][1] // croak "Error: undefined index ($idx) specified!  Died";

  return ([ 'other', '!' ]) if ($op eq '==');
  return () if ($op eq '!=');
  croak "Error: Unhandled nullcomp operator '$op'!  Died";
  #my @literals = @{$expr->[1]};
  #return @literals if ($op->[1] eq '!=');
  #return ( [ 'other', '!' ], @literals );
}

#-----

sub _rw_stripparens()
{
  my $idx = shift;

  my $ptr = $_[$idx] // croak "Error: undefined index ($idx) specified!  Died";
  croak "Error: Index ${idx} is not an expression!  Died" unless ($ptr->[0] eq 'expression');
  $ptr = $ptr->[1];

  my @expr;
  my $haveparen = ($ptr->[0][1] eq '(') ? 1 : 0;
  push(@expr, $ptr->[0]) unless ($haveparen);
  for (my $i = 1; $i < $#{$ptr}; $i++)
  { push(@expr, $ptr->[$i]); }
  push(@expr, $ptr->[$#{$ptr}]) unless ($haveparen);
  return @expr;
}

#---------------------------------------------------------------------

sub makelogic($)
{
  my $str = shift;

  ## Pass 1: tokenize and convert variables to the logstash syntax
  my @parts;

  foreach my $chunk (tokenize($str))
  {
    if ($chunk =~ /^[_a-z.?]+$/i)
    {
      $chunk =~ s/\?//g;
      push(@parts, [ 'name', convert2squares($chunk) ]);
    }
    elsif ($chunk eq '')
    { die "Empty token found in: '$str'  Got to: " . Dumper(\@parts); }
    else
    { push(@parts, [ 'other', $chunk ]); }
  }

  my @newparts = @parts;

  ####
  ## Fixups:
  ####

  ##  null comparisons: expression [!=]= null  -> [!] expression
  my $rr = sub { my @c = @_; return &_rw_nullcomp(0, @c); }; 
  ###
  doReplace( \@newparts, [ qw( expression .+ re [=!]= literal null ) ], [ 'function', $rr, 'expression', '$1' ]);


  ## set contains expression: ! EXPR1.contains( EXPR2 ) -> EXPR2 not in EXPR1
  ## set contains expression: EXPR1.contains( EXPR2 ) -> EXPR2 in EXPR1
  my $rr = sub { my @c = @_; return &_rw_stripparens(1, @c); }; 
  doReplace( \@newparts,
     [ qw( literal ! expression .+ literal [][contains] expression .+ ) ],
     [ 'function', $rr, qw( literal not literal in expression $1 ) ]);
  doReplace( \@newparts,
     [ qw( expression .+ literal [][contains] expression .+ ) ],
     [ 'function', $rr, qw( literal in expression $1 ) ]);


#foreach my $c (@newparts) { next unless ($c->[1] eq '[][length]'); $verbosity = 100; } ## DAN DEBUG
#printf STDERR "MARK Before: %s\n", Dumper(\@newparts) if ($verbosity == 100);
  ##  .length operators:  [] [length] ( ) > number  -> expression =~ /^.../
  my $rr = sub { my @c = @_; return ( &_rw_stripparens(0, @c), [ 'literal', '=~' ], &_rw_repeatmatch( $c[1][1], $c[2][1] )); };
  doReplace( \@newparts, [ qw( expression .+ literal [][length] literal ( literal ) re [<>]=? re \d+ ) ], [ 'function', $rr ]);
#die "After: " . Dumper(\@newparts) if ($verbosity == 100);


  ## logic operators: && -> and
  ## logic operators: || -> or
  doReplace( \@newparts, [ qw( literal && ) ], [ qw( literal and ) ]);
  doReplace( \@newparts, [ qw( literal || ) ], [ qw( literal or ) ]);


  my $newstr = join(' ', map { $_->[1] } @newparts);
  printf STDERR "DEBUG: Converted conditional '%s' to '%s'\n", $str, $newstr
    if ($verbosity >= 40);
  return $newstr;
}

#---------------------------------------------------------------------





#---------------------------------------------------------------------
#---------------------------------------------------------------------
#----------- PROCESSOR WRAPPERS --------------------------------------
#---------------------------------------------------------------------
#---------------------------------------------------------------------

sub wrap_OnFailure($$$)
{
  my $core = shift;
  my $ptr = shift;
  my $errtag = shift;

  return $core unless (defined $ptr->{'on_failure'});
  $ptr->{'_cmuHandle'}{'on_failure'} = 1;

  my $before = sprintf ("mutate {\n remove_tag => [\n\"%s\"\n]\n}\n", $errtag);
  if ((defined $ptr->{'ignore_failure'}) && $ptr->{'ignore_failure'})
  {
    $before .= "## NOTICE: ignore_failure is set to true, and on_failure is set as well.\n";
    $before .= "## NOTICE: The ignore_failure action will likely be ignored.\n";
  }
  my $after  = '';
  $after    .= sprintf ("if \"%s\" in [tags] {\n", $errtag);
  my $first = 1;
  my $rmerror = sprintf ("remove_tag => [\n \"%s\"\n]\n", $errtag);
  foreach my $chunk (@{$ptr->{'on_failure'}})
  {
    my $action = getKey($chunk);
    if ($action eq 'append')
    {
      $after .= sprintf ("mutate {\n add_field => {\n\"%s\" => \"%s\"\n}\n%s}\n",
          convert2squares($chunk->{$action}{'field'}),
          convert2value($chunk->{$action}{'value'}),
          $rmerror,
         );
      $rmerror = '';
    }
    else
    { $after .= "### WARNING: Unhandled on_failure action ${action}!\n"; }
  }
  $after .= sprintf ("mutate {\n %s}\n", $rmerror) if ($rmerror);
  $after    .= "}\n";
  return  $before . $core . $after;
}

#---------------------------------------------------------------------

sub wrap_If($$)
{
  my $core = shift;
  my $ptr = shift;

  return $core unless (defined $ptr->{'if'});
  $ptr->{'_cmuHandle'}{'if'} = 1;
  croak "Error: 'if' field defined but empty!  Died" if ($ptr->{'if'} eq '');
  return sprintf("if %s {\n %s}\n", makelogic($ptr->{'if'}), $core);
}

#---------------------------------------------------------------------

sub wrap_IgnoreEmptyValue($$)
{
  my $core = shift;
  my $ptr = shift;

  return $core unless (defined $ptr->{'ignore_empty_value'});
  $ptr->{'_cmuHandle'}{'ignore_empty_value'} = 1;
  #return sprintf("if %s {\n %s}\n", filterQuote(convert2squares($ptr->{'field'})), $core);
  if ($ptr->{'value'} =~ /^{{(.*)}}$/)
  {
    my $field = $1;
    return sprintf("if %s {\n %s}\n", filterQuote(convert2squares($field)), $core);
  }

  return sprintf("### FIXME: the following 'value' needs to be nonempty for the condition to be true\nif %s {\n %s}\n", filterQuote($ptr->{'value'}), $core);
}

#---------------------------------------------------------------------

sub wrap_IgnoreFailure($$$)
{
  my $core = shift;
  my $ptr = shift;
  my $errtag = shift;

  return $core unless (defined $ptr->{'ignore_failure'});
  $ptr->{'_cmuHandle'}{'ignore_failure'} = 1;
  return $core . sprintf("mutate {\n remove_tag => [\n \"%s\"\n]\n}\n", $errtag) if ($ptr->{'ignore_failure'});
  return $core . sprintf("if \"%s\" in [tags] {\n drop { }\n}\n", $errtag);
}

#---------------------------------------------------------------------

sub wrap_IgnoreMissing($$)
{
  my $core = shift;
  my $ptr = shift;

  return $core unless (defined $ptr->{'ignore_missing'});
  $ptr->{'_cmuHandle'}{'ignore_missing'} = 1;
  return sprintf("if %s {\n %s}\n", filterQuote(convert2squares($ptr->{'field'})), $core);
}

#---------------------------------------------------------------------








#---------------------------------------------------------------------
#---------------------------------------------------------------------
#----------- PROCESSORS ----------------------------------------------
#---------------------------------------------------------------------
#---------------------------------------------------------------------

#---------------------------------------------------------------------

sub helper_convertfield($)
{
  my $type = shift;
  return 'integer"   ## Remapped for logstash.  Former type: long' if ($type eq 'long');
  return $type;
}

sub process_convert($)
{
  my $ptr = shift;

  my $fname = (defined $ptr->{'target_field'}) ? $ptr->{'target_field'} : $ptr->{'field'};
  my $str = '';
  $str .= sprintf ("mutate {\nadd_field => {\n\"%s\" => \"%s\"\n}\n}\n",
        convert2squares($fname),
        convert2squares($ptr->{'field'})) if (defined $ptr->{'target_field'});
  $str .= sprintf ("mutate {\nconvert => {\n\"%s\" => \"%s\"\n}\n}\n",
        convert2squares($fname),
        helper_convertfield($ptr->{'type'}));
  
  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'type'} = 1;
  $ptr->{'_cmuHandle'}{'target_field'} = 1;
  return wrap_If( wrap_IgnoreMissing( $str, $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_date($)
{
  my $ptr = shift;

  my $str = sprintf ("date {\nmatch => [\n\"%s\",\n%s\n]\n",
        convert2squares($ptr->{'field'}),
        join(", ", map { sprintf('"%s"', $_) } @{$ptr->{'formats'}}));
  $str .= sprintf ("target => \"%s\"\n", convert2squares($ptr->{'target_field'}));
  $str .= sprintf ("timezone => \"%s\"\n", convert2value($ptr->{'timezone'})) if ($ptr->{'timezone'} ne '');
  $str .= "}\n";

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'formats'} = 1;
  $ptr->{'_cmuHandle'}{'target_field'} = 1;
  $ptr->{'_cmuHandle'}{'timezone'} = 1;
  return wrap_If( wrap_IgnoreMissing( wrap_IgnoreFailure( wrap_OnFailure($str, $ptr, '_dateparsefailure'), $ptr, '_grokparsefailure'), $ptr), $ptr );
}

#---------------------------------------------------------------------

sub helper_geoipmapfield($)
{
  my $field = shift;
  return 'autonomous_system_number' if ($field eq 'asn');
  return 'organization' if ($field eq 'organization_name');
  return $field;
}

sub process_geoip($)
{
  my $ptr = shift;

  my $str = sprintf("geoip {\nsource => \"%s\"\ntarget => \"%s\"\n",
        convert2squares($ptr->{'field'}),
        convert2squares($ptr->{'target_field'}));
  if ($ptr->{'database_file'} ne '')
  {
    my $dbfile = $ptr->{'database_file'};
    $str .= sprintf("database => \"%s\"\n", $dbfile);
    if ($dbfile =~ /ASN/i)
    { $str .= "default_database_type => \"ASN\"\n"; }
  }
  if (defined $ptr->{'properties'})
  {
    $str .= sprintf("fields => [\n\"%s\"\n]\n", join("\",\n\"", map { helper_geoipmapfield($_) } @{$ptr->{'properties'}}));
  }
  $str .= "}\n";

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'database_file'} = 1;
  $ptr->{'_cmuHandle'}{'properties'} = 1;
  $ptr->{'_cmuHandle'}{'target_field'} = 1;
  return wrap_If( wrap_IgnoreMissing( $str, $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_grok($)
{
  my $ptr = shift;

  my @patterns = convert2logstashgrok($ptr->{'patterns'});
  my $str = sprintf ("grok {\nmatch => {\n\"%s\" => ",
        convert2squares($ptr->{'field'}));
  $str .= ($#patterns == 0) ? $patterns[0] . "\n" : sprintf("[\n%s\n]\n", join(",\n", @patterns));
  $str .= "}\n";

  $str .= "remove_tag => [ \"_grokparsefailure\" ]\n" if ($ptr->{'ignore_failure'});
  if (defined $ptr->{'pattern_definitions'})
  {
    $str .= "pattern_definitions => {\n";
    foreach my $handle (keys %{$ptr->{'pattern_definitions'}})
    { $str .= sprintf("\"%s\" => \"%s\"\n", $handle, $ptr->{'pattern_definitions'}{$handle}); }
    $str .= "}\n";
  }

  $str .= "}\n";

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'patterns'} = 1;
  $ptr->{'_cmuHandle'}{'pattern_definitions'} = 1;
  return wrap_If( wrap_IgnoreMissing( wrap_IgnoreFailure( wrap_OnFailure($str, $ptr, '_grokparsefailure'), $ptr, '_grokparsefailure'), $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_gsub($)
{
  my $ptr = shift;

  ## NOTE TO SELF: \\ gets converted to \    -- I'm not sure if that's good or not!
  my $str = sprintf ("mutate {\ngsub => [\n\"%s\", \"%s\", \"%s\"\n]\n}\n",
        convert2squares($ptr->{'field'}),
        $ptr->{'pattern'},
        convert2value($ptr->{'replacement'}));

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'pattern'} = 1;
  $ptr->{'_cmuHandle'}{'replacement'} = 1;
  return wrap_If( wrap_IgnoreMissing( wrap_OnFailure($str, $ptr, '_mutate_error'), $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_kv($)
{
  my $ptr = shift;

  my $str = '';
  $str .= sprintf ("kv {\nsource => \"%s\"\n", convert2squares($ptr->{'field'}));
  $str .= sprintf ("field_split_pattern => \"%s\"\n", $ptr->{'field_split'}) if (defined $ptr->{'field_split'});
  $str .= sprintf ("value_split => \"%s\"\n", $ptr->{'value_split'}) if (defined $ptr->{'value_split'});
  $str .= sprintf ("prefix => \"%s\"\n", $ptr->{'prefix'}) if (defined $ptr->{'prefix'});
  $str .= sprintf ("trim_value => \"%s\"\n", escape($ptr->{'trim_value'})) if (defined $ptr->{'trim_value'});
  $str .= sprintf ("remove_tag => \"_kv_filter_error\"\n") if ($ptr->{'ignore_failure'});
  $str .= "}\n";

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'field_split'} = 1;
  $ptr->{'_cmuHandle'}{'value_split'} = 1;
  $ptr->{'_cmuHandle'}{'prefix'} = 1;
  $ptr->{'_cmuHandle'}{'trim_value'} = 1;
  return wrap_If( wrap_IgnoreMissing( wrap_IgnoreFailure(wrap_OnFailure($str, $ptr, '_kv_filter_error'), $ptr, '_kv_filter_error'), $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_remove($)
{
  my $ptr = shift;

  ## I'm hoping this is a no-op if the field is missing
  my $str = sprintf ("mutate {\nremove_field => [\n\"%s\"\n]\n}\n",
        join('", "', convert2squares($ptr->{'field'})));

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'ignore_missing'} = 1;  ## Default
  $ptr->{'_cmuHandle'}{'ignore_failure'} = 1;  ## Default
  return wrap_If( $str, $ptr );
}

#---------------------------------------------------------------------

sub process_rename($)
{
  my $ptr = shift;

  my $str = "## Caveat emptor: Unlike ES, Logstash treads dotted names and object names differently!  If the following is a dotted prefix, Logstash won't rename it like Elastic does.\n";
  $str .= sprintf ("mutate {\nrename => {\n\"%s\" => \"%s\"\n}\n}\n",
        convert2squares($ptr->{'field'}),
        convert2squares($ptr->{'target_field'}));

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'target_field'} = 1;
  return wrap_If( wrap_IgnoreMissing($str, $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_set($)
{
  my $ptr = shift;

  my $str = sprintf ("mutate {\nadd_field => {\n\"%s\" => \"%s\"\n}\n}\n",
        convert2squares($ptr->{'field'}),
        convert2value($ptr->{'value'}));

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'value'} = 1;
  return wrap_If( wrap_IgnoreEmptyValue($str, $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_split($)
{
  my $ptr = shift;

  my $str;
  printf STDERR "NOTICE: The 'split' operator in ES supports regexes, while the Logstash version doesn't. Caveat emptor!\n"
    if ($verbosity >= 5);
  if (defined $ptr->{'separator'} && ($ptr->{'separator'} =~ /[\\*?.]/))
  {
    my $tgt = (defined $ptr->{'target_field'}) ? $ptr->{'target_field'} : convert2squares($ptr->{'field'});
    $str .= sprintf ("ruby {\n code => 'event.set(\"%s\", event.get(\"%s\").split(/%s/)'\n}\n",
                $tgt,
                convert2squares($ptr->{'field'}),
                $ptr->{'separator'});
 
  }
  else
  {
    $str .= sprintf ("split {\n field => \"%s\"\n", convert2squares($ptr->{'field'}));
    $str .= sprintf (" terminator => \"%s\"\n", $ptr->{'separator'}) if (defined $ptr->{'separator'});
    $str .= sprintf (" target => \"%s\"\n", $ptr->{'target_field'}) if (defined $ptr->{'target_field'});
    $str .= "}\n";
  }

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'separator'} = 1;
  $ptr->{'_cmuHandle'}{'target_field'} = 1;
  $ptr->{'_cmuHandle'}{'ignore_failure'} = 1;  ## No-op on Logstash
  return wrap_If( wrap_IgnoreEmptyValue($str, $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_urldecode($)
{
  my $ptr = shift;

  my $str = sprintf ("urldecode {\nfield => \"%s\"\n}\n", convert2squares($ptr->{'field'}));

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  return wrap_If( wrap_IgnoreMissing( wrap_OnFailure($str, $ptr, '_urldecodefailure'), $ptr), $ptr );
}

#---------------------------------------------------------------------

sub process_useragent($)
{
  my $ptr = shift;

  my $str = sprintf ("useragent {\nsource => \"%s\"\ntarget => \"[user_agent]\"\n}\n", convert2squares($ptr->{'field'}));

  ## Mark fields that we handle -- the calling process can/will check for dangling fields and warn about them
  $ptr->{'_cmuHandle'}{'field'} = 1;
  $ptr->{'_cmuHandle'}{'ignore_failure'} = 1;   ## No-op under logstash
  return wrap_If( wrap_IgnoreMissing( $str, $ptr), $ptr );
}

#---------------------------------------------------------------------
